Skip to content

Commit

Permalink
Datastore handles creating objects atomically.
Browse files Browse the repository at this point in the history
In that commit, AtomicPutCreate takes previous = nil to Atomically create keys
that don't exist.  We need a create operation that is atomic to prevent races
between multiple libnetworks creating the same object.

Previously, we just created new KVs with an index of 0 and wrote them to the
datastore.  Consul accepts this behaviour and interprets index of 0 as
non-existing, but other data backends do no.

 - Add Exists() to the KV interface.  SetIndex() should also modify a KV so
   that it exists.
 - Call SetIndex() from within the GetObject() method on DataStore interface.
   - This ensures objects have the updated values for exists and index.
 - Add SetValue() to the KV interface.  This allows implementers to define
   their own method to marshall and unmarshall (as bitseq and allocator have).
 - Update existing users of the DataStore (endpoint, network, bitseq,
   allocator, ov_network) to new interfaces.
 - Fix UTs.
  • Loading branch information
Spike Curtis committed Jun 25, 2015
1 parent cc86aee commit e2a63df
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 15 additions & 19 deletions bitseq/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Handle struct {
app string
id string
dbIndex uint64
dbExists bool
store datastore.DataStore
sync.Mutex
}
Expand All @@ -54,18 +55,10 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32
h.watchForChanges()

// Get the initial status from the ds if present.
// We will be getting an instance without a dbIndex
// (GetObject() does not set it): It is ok for now,
// it will only cause the first allocation on this
// node to go through a retry.
var bah []byte
if err := h.store.GetObject(datastore.Key(h.Key()...), &bah); err != nil {
if err != datastore.ErrKeyNotFound {
return nil, err
}
return h, nil
err := h.store.GetObject(datastore.Key(h.Key()...), h)
if err != datastore.ErrKeyNotFound {
return nil, err
}
err := h.FromByteArray(bah)

return h, err
}
Expand Down Expand Up @@ -199,7 +192,14 @@ func (h *Handle) CheckIfAvailable(ordinal int) (int, int, error) {
func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error {
// Create a copy of the current handler
h.Lock()
nh := &Handle{app: h.app, id: h.id, store: h.store, dbIndex: h.dbIndex, head: h.head.GetCopy()}
nh := &Handle{
app: h.app,
id: h.id,
store: h.store,
dbIndex: h.dbIndex,
head: h.head.GetCopy(),
dbExists: h.dbExists,
}
h.Unlock()

nh.head = PushReservation(bytePos, bitPos, nh.head, release)
Expand All @@ -214,7 +214,9 @@ func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error {
} else {
h.unselected--
}
h.dbIndex = nh.dbIndex
// Can't use SetIndex() since we're locked.
h.dbIndex = nh.Index()
h.dbExists = true
h.Unlock()
}

Expand Down Expand Up @@ -276,12 +278,6 @@ func (h *Handle) Unselected() uint32 {
return h.unselected
}

func (h *Handle) getDBIndex() uint64 {
h.Lock()
defer h.Unlock()
return h.dbIndex
}

// GetFirstAvailable looks for the first unset bit in passed mask
func GetFirstAvailable(head *Sequence) (int, int, error) {
byteIndex := 0
Expand Down
19 changes: 15 additions & 4 deletions bitseq/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func (h *Handle) Value() []byte {
return jv
}

// SetValue unmarshals the data from the KV store
func (h *Handle) SetValue(value []byte) error {
return h.FromByteArray(value)
}

// Index returns the latest DB Index as seen by this object
func (h *Handle) Index() uint64 {
h.Lock()
Expand All @@ -49,9 +54,17 @@ func (h *Handle) Index() uint64 {
func (h *Handle) SetIndex(index uint64) {
h.Lock()
h.dbIndex = index
h.dbExists = true
h.Unlock()
}

// Exists method is true if this object has been stored in the DB.
func (h *Handle) Exists() bool {
h.Lock()
defer h.Unlock()
return h.dbExists
}

func (h *Handle) watchForChanges() error {
h.Lock()
store := h.store
Expand All @@ -70,14 +83,12 @@ func (h *Handle) watchForChanges() error {
select {
case kvPair := <-kvpChan:
// Only process remote update
if kvPair != nil && (kvPair.LastIndex != h.getDBIndex()) {
if kvPair != nil && (kvPair.LastIndex != h.Index()) {
err := h.fromDsValue(kvPair.Value)
if err != nil {
log.Warnf("Failed to reconstruct bitseq handle from ds watch: %s", err.Error())
} else {
h.Lock()
h.dbIndex = kvPair.LastIndex
h.Unlock()
h.SetIndex(kvPair.LastIndex)
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

"github.com/BurntSushi/toml"
log "github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/netlabel"
)

Expand Down Expand Up @@ -57,13 +58,15 @@ type Option func(c *Config)
// OptionDefaultNetwork function returns an option setter for a default network
func OptionDefaultNetwork(dn string) Option {
return func(c *Config) {
log.Infof("Option DefaultNetwork: %s", dn)
c.Daemon.DefaultNetwork = strings.TrimSpace(dn)
}
}

// OptionDefaultDriver function returns an option setter for default driver
func OptionDefaultDriver(dd string) Option {
return func(c *Config) {
log.Infof("Option DefaultDriver: %s", dd)
c.Daemon.DefaultDriver = strings.TrimSpace(dd)
}
}
Expand All @@ -82,13 +85,15 @@ func OptionLabels(labels []string) Option {
// OptionKVProvider function returns an option setter for kvstore provider
func OptionKVProvider(provider string) Option {
return func(c *Config) {
log.Infof("Option OptionKVProvider: %s", provider)
c.Datastore.Client.Provider = strings.TrimSpace(provider)
}
}

// OptionKVProviderURL function returns an option setter for kvstore url
func OptionKVProviderURL(url string) Option {
return func(c *Config) {
log.Infof("Option OptionKVProviderURL: %s", url)
c.Datastore.Client.Address = strings.TrimSpace(url)
}
}
Expand Down
1 change: 1 addition & 0 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
}

if err := c.updateNetworkToStore(network); err != nil {
log.Warnf("couldnt create network %s: %v", network.name, err)
if e := network.Delete(); e != nil {
log.Warnf("couldnt cleanup network %s: %v", network.name, err)
}
Expand Down
37 changes: 20 additions & 17 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package datastore

import (
"encoding/json"
"reflect"
"strings"

Expand All @@ -14,9 +13,7 @@ import (
//DataStore exported
type DataStore interface {
// GetObject gets data from datastore and unmarshals to the specified object
GetObject(key string, o interface{}) error
// GetUpdatedObject gets data from datastore along with its index and unmarshals to the specified object
GetUpdatedObject(key string, o interface{}) (uint64, error)
GetObject(key string, o KV) error
// PutObject adds a new Record based on an object into the datastore
PutObject(kvObject KV) error
// PutObjectAtomic provides an atomic add and update operation for a Record
Expand Down Expand Up @@ -49,10 +46,15 @@ type KV interface {
KeyPrefix() []string
// Value method lets an object to marshal its content to be stored in the KV store
Value() []byte
// SetValue is used by the datastore to set the object's value when loaded from the data store.
SetValue([]byte) error
// Index method returns the latest DB Index as seen by the object
Index() uint64
// SetIndex method allows the datastore to store the latest DB Index into the object
SetIndex(uint64)
// True if the object exists in the datastore, false if it hasn't been stored yet.
// When SetIndex() is called, the object has been stored.
Exists() bool
}

const (
Expand Down Expand Up @@ -121,7 +123,12 @@ func (ds *datastore) PutObjectAtomic(kvObject KV) error {
return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
}

previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
var previous *store.KVPair
if kvObject.Exists() {
previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
} else {
previous = nil
}
_, pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
if err != nil {
return err
Expand Down Expand Up @@ -149,24 +156,20 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
}

// GetObject returns a record matching the key
func (ds *datastore) GetObject(key string, o interface{}) error {
func (ds *datastore) GetObject(key string, o KV) error {
kvPair, err := ds.store.Get(key)
if err != nil {
return err
}
return json.Unmarshal(kvPair.Value, o)
}

// GetUpdateObject returns a record matching the key
func (ds *datastore) GetUpdatedObject(key string, o interface{}) (uint64, error) {
kvPair, err := ds.store.Get(key)
err = o.SetValue(kvPair.Value)
if err != nil {
return 0, err
}
if err := json.Unmarshal(kvPair.Value, o); err != nil {
return 0, err
return err
}
return kvPair.LastIndex, nil

// Make sure the object has a correct view of the DB index in case we need to modify it
// and update the DB.
o.SetIndex(kvPair.LastIndex)
return nil
}

// DeleteObject unconditionally deletes a record from the store
Expand Down
52 changes: 43 additions & 9 deletions datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/libnetwork/config"
_ "github.com/docker/libnetwork/netutils"
"github.com/docker/libnetwork/options"
"github.com/stretchr/testify/assert"
)

var dummyKey = "dummy"
Expand Down Expand Up @@ -69,16 +70,18 @@ func TestKVObjectFlatKey(t *testing.T) {
func TestAtomicKVObjectFlatKey(t *testing.T) {
store := NewTestDataStore()
expected := dummyKVObject("1111", true)
assert.False(t, expected.Exists())
err := store.PutObjectAtomic(expected)
if err != nil {
t.Fatal(err)
}
assert.True(t, expected.Exists())

// PutObjectAtomic automatically sets the Index again. Hence the following must pass.

err = store.PutObjectAtomic(expected)
if err != nil {
t.Fatal("Atomic update with an older Index must fail")
t.Fatal("Atomic update should succeed.")
}

// Get the latest index and try PutObjectAtomic again for the same Key
Expand All @@ -90,12 +93,22 @@ func TestAtomicKVObjectFlatKey(t *testing.T) {
n := dummyObject{}
json.Unmarshal(data.Value, &n)
n.ID = "1111"
n.DBIndex = data.LastIndex
n.SetIndex(data.LastIndex)
n.ReturnValue = true
err = store.PutObjectAtomic(&n)
if err != nil {
t.Fatal(err)
}

// Get the Object using GetObject, then set again.
newObj := dummyObject{}
err = store.GetObject(Key(expected.Key()...), &newObj)
assert.True(t, newObj.Exists())
err = store.PutObjectAtomic(&n)
if err != nil {
t.Fatal(err)
}

}

// dummy data used to test the datastore
Expand All @@ -108,6 +121,7 @@ type dummyObject struct {
Generic options.Generic `kv:"iterative"`
ID string
DBIndex uint64
DBExists bool
ReturnValue bool
}

Expand All @@ -131,12 +145,21 @@ func (n *dummyObject) Value() []byte {
return b
}

func (n *dummyObject) SetValue(value []byte) error {
return json.Unmarshal(value, n)
}

func (n *dummyObject) Index() uint64 {
return n.DBIndex
}

func (n *dummyObject) SetIndex(index uint64) {
n.DBIndex = index
n.DBExists = true
}

func (n *dummyObject) Exists() bool {
return n.DBExists
}

func (n *dummyObject) MarshalJSON() ([]byte, error) {
Expand All @@ -162,10 +185,11 @@ func (n *dummyObject) UnmarshalJSON(b []byte) (err error) {

// dummy structure to test "recursive" cases
type recStruct struct {
Name string `kv:"leaf"`
Field1 int `kv:"leaf"`
Dict map[string]string `kv:"iterative"`
DBIndex uint64
Name string `kv:"leaf"`
Field1 int `kv:"leaf"`
Dict map[string]string `kv:"iterative"`
DBIndex uint64
DBExists bool
}

func (r *recStruct) Key() []string {
Expand All @@ -179,12 +203,21 @@ func (r *recStruct) Value() []byte {
return b
}

func (r *recStruct) SetValue(value []byte) error {
return json.Unmarshal(value, r)
}

func (r *recStruct) Index() uint64 {
return r.DBIndex
}

func (r *recStruct) SetIndex(index uint64) {
r.DBIndex = index
r.DBExists = true
}

func (r *recStruct) Exists() bool {
return r.DBExists
}

func dummyKVObject(id string, retValue bool) *dummyObject {
Expand All @@ -195,12 +228,13 @@ func dummyKVObject(id string, retValue bool) *dummyObject {
Name: "testNw",
NetworkType: "bridge",
EnableIPv6: true,
Rec: &recStruct{"gen", 5, cDict, 0},
Rec: &recStruct{"gen", 5, cDict, 0, false},
ID: id,
DBIndex: 0,
ReturnValue: retValue}
ReturnValue: retValue,
DBExists: false}
generic := make(map[string]interface{})
generic["label1"] = &recStruct{"value1", 1, cDict, 0}
generic["label1"] = &recStruct{"value1", 1, cDict, 0, false}
generic["label2"] = "subnet=10.1.1.0/16"
n.Generic = generic
return &n
Expand Down

0 comments on commit e2a63df

Please sign in to comment.