diff --git a/CHANGELOG.md b/CHANGELOG.md index f6b6533ce..bad4666af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,14 @@ ## [0.3.8](https://github.com/arangodb/kube-arangodb/tree/0.3.8) (2019-02-19) [Full Changelog](https://github.com/arangodb/kube-arangodb/compare/0.3.6...0.3.7) +- Wait for shards to be in sync before continuing upgrade process. +- Rotate members when patch-level upgrade. +- Don't trigger cleanout server during upgrade. +- More robust remove-server actions. + +## [0.3.8](https://github.com/arangodb/kube-arangodb/tree/0.3.8) (2019-02-19) +[Full Changelog](https://github.com/arangodb/kube-arangodb/compare/0.3.6...0.3.7) + - Added scaling limits to spec and enforce in operator. - npm update for dashboard to alleviate security problems. - Added bare metal walk through to documentation. diff --git a/deps/github.com/arangodb/go-driver/Makefile b/deps/github.com/arangodb/go-driver/Makefile index d49c309ef..b3ec850b9 100644 --- a/deps/github.com/arangodb/go-driver/Makefile +++ b/deps/github.com/arangodb/go-driver/Makefile @@ -325,6 +325,7 @@ __test_go_test: -e TEST_CVERSION=$(TEST_CVERSION) \ -e TEST_CONTENT_TYPE=$(TEST_CONTENT_TYPE) \ -e TEST_PPROF=$(TEST_PPROF) \ + -e TEST_MODE=$(TEST_MODE) \ -w /usr/code/ \ golang:$(GOVERSION) \ go test $(TAGS) $(TESTOPTIONS) $(TESTVERBOSEOPTIONS) $(TESTS) @@ -337,7 +338,7 @@ ifdef JWTSECRET echo "$JWTSECRET" > "${JWTSECRETFILE}" endif @-docker rm -f -v $(TESTCONTAINER) &> /dev/null - @TESTCONTAINER=$(TESTCONTAINER) ARANGODB=$(ARANGODB) STARTER=$(STARTER) STARTERMODE=$(TEST_MODE) TMPDIR=${GOBUILDDIR} $(CLUSTERENV) $(ROOTDIR)/test/cluster.sh start + @TESTCONTAINER=$(TESTCONTAINER) ARANGODB=$(ARANGODB) ARANGO_LICENSE_KEY=$(ARANGO_LICENSE_KEY) STARTER=$(STARTER) STARTERMODE=$(TEST_MODE) TMPDIR=${GOBUILDDIR} $(CLUSTERENV) $(ROOTDIR)/test/cluster.sh start endif __test_cleanup: diff --git a/deps/github.com/arangodb/go-driver/client.go b/deps/github.com/arangodb/go-driver/client.go index f1f4a7cee..d78dc2ad3 100644 --- a/deps/github.com/arangodb/go-driver/client.go +++ b/deps/github.com/arangodb/go-driver/client.go @@ -100,6 +100,10 @@ type VersionInfo struct { Details map[string]interface{} `json:"details,omitempty"` } +func (v *VersionInfo) IsEnterprise() bool { + return v.License == "enterprise" +} + // String creates a string representation of the given VersionInfo. func (v VersionInfo) String() string { result := fmt.Sprintf("%s, version %s, license %s", v.Server, v.Version, v.License) diff --git a/deps/github.com/arangodb/go-driver/cluster.go b/deps/github.com/arangodb/go-driver/cluster.go index 51cf3d977..cd4b14040 100644 --- a/deps/github.com/arangodb/go-driver/cluster.go +++ b/deps/github.com/arangodb/go-driver/cluster.go @@ -64,19 +64,40 @@ type ClusterHealth struct { Health map[ServerID]ServerHealth `json:"Health"` } +// ServerSyncStatus describes the servers sync status +type ServerSyncStatus string + +const ( + ServerSyncStatusUnknown ServerSyncStatus = "UNKNOWN" + ServerSyncStatusUndefined ServerSyncStatus = "UNDEFINED" + ServerSyncStatusStartup ServerSyncStatus = "STARTUP" + ServerSyncStatusStopping ServerSyncStatus = "STOPPING" + ServerSyncStatusStopped ServerSyncStatus = "STOPPED" + ServerSyncStatusServing ServerSyncStatus = "SERVING" + ServerSyncStatusShutdown ServerSyncStatus = "SHUTDOWN" +) + // ServerHealth contains health information of a single server in a cluster. type ServerHealth struct { - Endpoint string `json:"Endpoint"` - LastHeartbeatAcked time.Time `json:"LastHeartbeatAcked"` - LastHeartbeatSent time.Time `json:"LastHeartbeatSent"` - LastHeartbeatStatus string `json:"LastHeartbeatStatus"` - Role ServerRole `json:"Role"` - ShortName string `json:"ShortName"` - Status ServerStatus `json:"Status"` - CanBeDeleted bool `json:"CanBeDeleted"` - HostID string `json:"Host,omitempty"` - Version Version `json:"Version,omitempty"` - Engine EngineType `json:"Engine,omitempty"` + Endpoint string `json:"Endpoint"` + LastHeartbeatAcked time.Time `json:"LastHeartbeatAcked"` + LastHeartbeatSent time.Time `json:"LastHeartbeatSent"` + LastHeartbeatStatus string `json:"LastHeartbeatStatus"` + Role ServerRole `json:"Role"` + ShortName string `json:"ShortName"` + Status ServerStatus `json:"Status"` + CanBeDeleted bool `json:"CanBeDeleted"` + HostID string `json:"Host,omitempty"` + Version Version `json:"Version,omitempty"` + Engine EngineType `json:"Engine,omitempty"` + SyncStatus ServerSyncStatus `json:"SyncStatus,omitempty"` + + // Only for Coordinators + AdvertisedEndpoint *string `json:"AdvertisedEndpoint,omitempty"` + + // Only for Agents + Leader *string `json:"Leader,omitempty"` + Leading *bool `json:"Leading,omitempty"` } // ServerStatus describes the health status of a server @@ -146,6 +167,7 @@ type InventoryCollection struct { Indexes []InventoryIndex `json:"indexes,omitempty"` PlanVersion int64 `json:"planVersion,omitempty"` IsReady bool `json:"isReady,omitempty"` + AllInSync bool `json:"allInSync,omitempty"` } // IndexByFieldsAndType returns the InventoryIndex with given fields & type. @@ -189,6 +211,11 @@ type InventoryCollectionParameters struct { DistributeShardsLike string `json:"distributeShardsLike,omitempty"` } +// IsSatellite returns true if the collection is a satellite collection +func (icp *InventoryCollectionParameters) IsSatellite() bool { + return icp.ReplicationFactor == ReplicationFactorSatellite +} + // ShardID is an internal identifier of a specific shard type ShardID string diff --git a/deps/github.com/arangodb/go-driver/cluster_impl.go b/deps/github.com/arangodb/go-driver/cluster_impl.go index 0ddd307f0..4ec7d67da 100644 --- a/deps/github.com/arangodb/go-driver/cluster_impl.go +++ b/deps/github.com/arangodb/go-driver/cluster_impl.go @@ -24,7 +24,9 @@ package driver import ( "context" + "encoding/json" "path" + "reflect" ) // newCluster creates a new Cluster implementation. @@ -221,3 +223,151 @@ func (c *cluster) RemoveServer(ctx context.Context, serverID ServerID) error { } return nil } + +// replicationFactor represents the replication factor of a collection +// Has special value ReplicationFactorSatellite for satellite collections +type replicationFactor int + +type inventoryCollectionParametersInternal struct { + Deleted bool `json:"deleted,omitempty"` + DoCompact bool `json:"doCompact,omitempty"` + ID string `json:"id,omitempty"` + IndexBuckets int `json:"indexBuckets,omitempty"` + Indexes []InventoryIndex `json:"indexes,omitempty"` + IsSmart bool `json:"isSmart,omitempty"` + SmartGraphAttribute string `json:"smartGraphAttribute,omitempty"` + IsSystem bool `json:"isSystem,omitempty"` + IsVolatile bool `json:"isVolatile,omitempty"` + JournalSize int64 `json:"journalSize,omitempty"` + KeyOptions struct { + Type string `json:"type,omitempty"` + AllowUserKeys bool `json:"allowUserKeys,omitempty"` + LastValue int64 `json:"lastValue,omitempty"` + } `json:"keyOptions"` + Name string `json:"name,omitempty"` + NumberOfShards int `json:"numberOfShards,omitempty"` + Path string `json:"path,omitempty"` + PlanID string `json:"planId,omitempty"` + ReplicationFactor replicationFactor `json:"replicationFactor,omitempty"` + ShardKeys []string `json:"shardKeys,omitempty"` + Shards map[ShardID][]ServerID `json:"shards,omitempty"` + Status CollectionStatus `json:"status,omitempty"` + Type CollectionType `json:"type,omitempty"` + WaitForSync bool `json:"waitForSync,omitempty"` + DistributeShardsLike string `json:"distributeShardsLike,omitempty"` +} + +func (p *InventoryCollectionParameters) asInternal() inventoryCollectionParametersInternal { + return inventoryCollectionParametersInternal{ + Deleted: p.Deleted, + DoCompact: p.DoCompact, + ID: p.ID, + IndexBuckets: p.IndexBuckets, + Indexes: p.Indexes, + IsSmart: p.IsSmart, + SmartGraphAttribute: p.SmartGraphAttribute, + IsSystem: p.IsSystem, + IsVolatile: p.IsVolatile, + JournalSize: p.JournalSize, + KeyOptions: p.KeyOptions, + Name: p.Name, + NumberOfShards: p.NumberOfShards, + Path: p.Path, + PlanID: p.PlanID, + ReplicationFactor: replicationFactor(p.ReplicationFactor), + ShardKeys: p.ShardKeys, + Shards: p.Shards, + Status: p.Status, + Type: p.Type, + WaitForSync: p.WaitForSync, + DistributeShardsLike: p.DistributeShardsLike, + } +} + +func (p *InventoryCollectionParameters) fromInternal(i inventoryCollectionParametersInternal) { + *p = i.asExternal() +} + +func (p *inventoryCollectionParametersInternal) asExternal() InventoryCollectionParameters { + return InventoryCollectionParameters{ + Deleted: p.Deleted, + DoCompact: p.DoCompact, + ID: p.ID, + IndexBuckets: p.IndexBuckets, + Indexes: p.Indexes, + IsSmart: p.IsSmart, + SmartGraphAttribute: p.SmartGraphAttribute, + IsSystem: p.IsSystem, + IsVolatile: p.IsVolatile, + JournalSize: p.JournalSize, + KeyOptions: p.KeyOptions, + Name: p.Name, + NumberOfShards: p.NumberOfShards, + Path: p.Path, + PlanID: p.PlanID, + ReplicationFactor: int(p.ReplicationFactor), + ShardKeys: p.ShardKeys, + Shards: p.Shards, + Status: p.Status, + Type: p.Type, + WaitForSync: p.WaitForSync, + DistributeShardsLike: p.DistributeShardsLike, + } +} + +// MarshalJSON converts InventoryCollectionParameters into json +func (p *InventoryCollectionParameters) MarshalJSON() ([]byte, error) { + return json.Marshal(p.asInternal()) +} + +// UnmarshalJSON loads InventoryCollectionParameters from json +func (p *InventoryCollectionParameters) UnmarshalJSON(d []byte) error { + var internal inventoryCollectionParametersInternal + if err := json.Unmarshal(d, &internal); err != nil { + return err + } + + p.fromInternal(internal) + return nil +} + +const ( + replicationFactorSatelliteString string = "satellite" +) + +// MarshalJSON marshals InventoryCollectionParameters to arangodb json representation +func (r replicationFactor) MarshalJSON() ([]byte, error) { + var replicationFactor interface{} + + if int(r) == ReplicationFactorSatellite { + replicationFactor = replicationFactorSatelliteString + } else { + replicationFactor = int(r) + } + + return json.Marshal(replicationFactor) +} + +// UnmarshalJSON marshals InventoryCollectionParameters to arangodb json representation +func (r *replicationFactor) UnmarshalJSON(d []byte) error { + var internal interface{} + + if err := json.Unmarshal(d, &internal); err != nil { + return err + } + + if i, ok := internal.(float64); ok { + *r = replicationFactor(i) + return nil + } else if str, ok := internal.(string); ok { + if ok && str == replicationFactorSatelliteString { + *r = replicationFactor(ReplicationFactorSatellite) + return nil + } + } + + return &json.UnmarshalTypeError{ + Value: string(d), + Type: reflect.TypeOf(r).Elem(), + } +} diff --git a/deps/github.com/arangodb/go-driver/collection.go b/deps/github.com/arangodb/go-driver/collection.go index ed0ffa24d..1ba758a32 100644 --- a/deps/github.com/arangodb/go-driver/collection.go +++ b/deps/github.com/arangodb/go-driver/collection.go @@ -118,6 +118,16 @@ type CollectionProperties struct { ReplicationFactor int `json:"replicationFactor,omitempty"` } +const ( + // ReplicationFactorSatellite represents a satellite collection's replication factor + ReplicationFactorSatellite int = -1 +) + +// IsSatellite returns true if the collection is a satellite collection +func (p *CollectionProperties) IsSatellite() bool { + return p.ReplicationFactor == ReplicationFactorSatellite +} + // SetCollectionPropertiesOptions contains data for Collection.SetProperties. type SetCollectionPropertiesOptions struct { // If true then creating or changing a document will wait until the data has been synchronized to disk. diff --git a/deps/github.com/arangodb/go-driver/collection_impl.go b/deps/github.com/arangodb/go-driver/collection_impl.go index 00264ca76..77cb9ae80 100644 --- a/deps/github.com/arangodb/go-driver/collection_impl.go +++ b/deps/github.com/arangodb/go-driver/collection_impl.go @@ -24,6 +24,7 @@ package driver import ( "context" + "encoding/json" "path" ) @@ -163,11 +164,11 @@ func (c *collection) Properties(ctx context.Context) (CollectionProperties, erro if err := resp.CheckStatus(200); err != nil { return CollectionProperties{}, WithStack(err) } - var data CollectionProperties + var data collectionPropertiesInternal if err := resp.ParseBody("", &data); err != nil { return CollectionProperties{}, WithStack(err) } - return data, nil + return data.asExternal(), nil } // SetProperties changes properties of the collection. @@ -176,7 +177,7 @@ func (c *collection) SetProperties(ctx context.Context, options SetCollectionPro if err != nil { return WithStack(err) } - if _, err := req.SetBody(options); err != nil { + if _, err := req.SetBody(options.asInternal()); err != nil { return WithStack(err) } resp, err := c.conn.Do(ctx, req) @@ -263,3 +264,106 @@ func (c *collection) Truncate(ctx context.Context) error { } return nil } + +type collectionPropertiesInternal struct { + CollectionInfo + WaitForSync bool `json:"waitForSync,omitempty"` + DoCompact bool `json:"doCompact,omitempty"` + JournalSize int64 `json:"journalSize,omitempty"` + KeyOptions struct { + Type KeyGeneratorType `json:"type,omitempty"` + AllowUserKeys bool `json:"allowUserKeys,omitempty"` + } `json:"keyOptions,omitempty"` + NumberOfShards int `json:"numberOfShards,omitempty"` + ShardKeys []string `json:"shardKeys,omitempty"` + ReplicationFactor replicationFactor `json:"replicationFactor,omitempty"` +} + +func (p *collectionPropertiesInternal) asExternal() CollectionProperties { + return CollectionProperties{ + CollectionInfo: p.CollectionInfo, + WaitForSync: p.WaitForSync, + DoCompact: p.DoCompact, + JournalSize: p.JournalSize, + KeyOptions: p.KeyOptions, + NumberOfShards: p.NumberOfShards, + ShardKeys: p.ShardKeys, + ReplicationFactor: int(p.ReplicationFactor), + } +} + +func (p *CollectionProperties) asInternal() collectionPropertiesInternal { + return collectionPropertiesInternal{ + CollectionInfo: p.CollectionInfo, + WaitForSync: p.WaitForSync, + DoCompact: p.DoCompact, + JournalSize: p.JournalSize, + KeyOptions: p.KeyOptions, + NumberOfShards: p.NumberOfShards, + ShardKeys: p.ShardKeys, + ReplicationFactor: replicationFactor(p.ReplicationFactor), + } +} + +func (p *CollectionProperties) fromInternal(i *collectionPropertiesInternal) { + p.CollectionInfo = i.CollectionInfo + p.WaitForSync = i.WaitForSync + p.DoCompact = i.DoCompact + p.JournalSize = i.JournalSize + p.KeyOptions = i.KeyOptions + p.NumberOfShards = i.NumberOfShards + p.ShardKeys = i.ShardKeys + p.ReplicationFactor = int(i.ReplicationFactor) +} + +// MarshalJSON converts CollectionProperties into json +func (p *CollectionProperties) MarshalJSON() ([]byte, error) { + return json.Marshal(p.asInternal()) +} + +// UnmarshalJSON loads CollectionProperties from json +func (p *CollectionProperties) UnmarshalJSON(d []byte) error { + var internal collectionPropertiesInternal + if err := json.Unmarshal(d, &internal); err != nil { + return err + } + + p.fromInternal(&internal) + return nil +} + +type setCollectionPropertiesOptionsInternal struct { + WaitForSync *bool `json:"waitForSync,omitempty"` + JournalSize int64 `json:"journalSize,omitempty"` + ReplicationFactor replicationFactor `json:"replicationFactor,omitempty"` +} + +func (p *SetCollectionPropertiesOptions) asInternal() setCollectionPropertiesOptionsInternal { + return setCollectionPropertiesOptionsInternal{ + WaitForSync: p.WaitForSync, + JournalSize: p.JournalSize, + ReplicationFactor: replicationFactor(p.ReplicationFactor), + } +} + +func (p *SetCollectionPropertiesOptions) fromInternal(i *setCollectionPropertiesOptionsInternal) { + p.WaitForSync = i.WaitForSync + p.JournalSize = i.JournalSize + p.ReplicationFactor = int(i.ReplicationFactor) +} + +// MarshalJSON converts SetCollectionPropertiesOptions into json +func (p *SetCollectionPropertiesOptions) MarshalJSON() ([]byte, error) { + return json.Marshal(p.asInternal()) +} + +// UnmarshalJSON loads SetCollectionPropertiesOptions from json +func (p *SetCollectionPropertiesOptions) UnmarshalJSON(d []byte) error { + var internal setCollectionPropertiesOptionsInternal + if err := json.Unmarshal(d, &internal); err != nil { + return err + } + + p.fromInternal(&internal) + return nil +} diff --git a/deps/github.com/arangodb/go-driver/database_collections_impl.go b/deps/github.com/arangodb/go-driver/database_collections_impl.go index 3eae4c5c6..5289d4bfe 100644 --- a/deps/github.com/arangodb/go-driver/database_collections_impl.go +++ b/deps/github.com/arangodb/go-driver/database_collections_impl.go @@ -101,17 +101,32 @@ func (d *database) Collections(ctx context.Context) ([]Collection, error) { return result, nil } +type createCollectionOptionsInternal struct { + JournalSize int `json:"journalSize,omitempty"` + ReplicationFactor replicationFactor `json:"replicationFactor,omitempty"` + WaitForSync bool `json:"waitForSync,omitempty"` + DoCompact *bool `json:"doCompact,omitempty"` + IsVolatile bool `json:"isVolatile,omitempty"` + ShardKeys []string `json:"shardKeys,omitempty"` + NumberOfShards int `json:"numberOfShards,omitempty"` + IsSystem bool `json:"isSystem,omitempty"` + Type CollectionType `json:"type,omitempty"` + IndexBuckets int `json:"indexBuckets,omitempty"` + KeyOptions *CollectionKeyOptions `json:"keyOptions,omitempty"` + DistributeShardsLike string `json:"distributeShardsLike,omitempty"` + IsSmart bool `json:"isSmart,omitempty"` + SmartGraphAttribute string `json:"smartGraphAttribute,omitempty"` + Name string `json:"name"` +} + // CreateCollection creates a new collection with given name and options, and opens a connection to it. // If a collection with given name already exists within the database, a DuplicateError is returned. func (d *database) CreateCollection(ctx context.Context, name string, options *CreateCollectionOptions) (Collection, error) { - input := struct { - CreateCollectionOptions - Name string `json:"name"` - }{ + input := createCollectionOptionsInternal{ Name: name, } if options != nil { - input.CreateCollectionOptions = *options + input.fromExternal(options) } req, err := d.conn.NewRequest("POST", path.Join(d.relPath(), "_api/collection")) if err != nil { @@ -134,3 +149,55 @@ func (d *database) CreateCollection(ctx context.Context, name string, options *C } return col, nil } + +// func (p *CreateCollectionOptions) asInternal() createCollectionOptionsInternal { +// return createCollectionOptionsInternal{ +// JournalSize: p.JournalSize, +// ReplicationFactor: replicationFactor(p.ReplicationFactor), +// WaitForSync: p.WaitForSync, +// DoCompact: p.DoCompact, +// IsVolatile: p.IsVolatile, +// ShardKeys: p.ShardKeys, +// NumberOfShards: p.NumberOfShards, +// IsSystem: p.IsSystem, +// Type: p.Type, +// IndexBuckets: p.IndexBuckets, +// KeyOptions: p.KeyOptions, +// DistributeShardsLike: p.DistributeShardsLike, +// IsSmart: p.IsSmart, +// SmartGraphAttribute: p.SmartGraphAttribute, +// } +// } + +func (p *createCollectionOptionsInternal) fromExternal(i *CreateCollectionOptions) { + p.JournalSize = i.JournalSize + p.ReplicationFactor = replicationFactor(i.ReplicationFactor) + p.WaitForSync = i.WaitForSync + p.DoCompact = i.DoCompact + p.IsVolatile = i.IsVolatile + p.ShardKeys = i.ShardKeys + p.NumberOfShards = i.NumberOfShards + p.IsSystem = i.IsSystem + p.Type = i.Type + p.IndexBuckets = i.IndexBuckets + p.KeyOptions = i.KeyOptions + p.DistributeShardsLike = i.DistributeShardsLike + p.IsSmart = i.IsSmart + p.SmartGraphAttribute = i.SmartGraphAttribute +} + +// // MarshalJSON converts CreateCollectionOptions into json +// func (p *CreateCollectionOptions) MarshalJSON() ([]byte, error) { +// return json.Marshal(p.asInternal()) +// } + +// // UnmarshalJSON loads CreateCollectionOptions from json +// func (p *CreateCollectionOptions) UnmarshalJSON(d []byte) error { +// var internal createCollectionOptionsInternal +// if err := json.Unmarshal(d, &internal); err != nil { +// return err +// } + +// p.fromInternal(&internal) +// return nil +// } diff --git a/deps/github.com/arangodb/go-driver/test/cluster.sh b/deps/github.com/arangodb/go-driver/test/cluster.sh index 356dbd435..8acfa877c 100755 --- a/deps/github.com/arangodb/go-driver/test/cluster.sh +++ b/deps/github.com/arangodb/go-driver/test/cluster.sh @@ -39,6 +39,9 @@ if [ "$CMD" == "start" ]; then if [ "$SSL" == "auto" ]; then STARTERARGS="$STARTERARGS --ssl.auto-key" fi + if [ -n "$ARANGO_LICENSE_KEY" ]; then + DOCKERARGS="$DOCKERARGS -e ARANGO_LICENSE_KEY=$ARANGO_LICENSE_KEY" + fi # Start network namespace docker run -d --name=${NAMESPACE} alpine:3.4 sleep 365d diff --git a/deps/github.com/arangodb/go-driver/test/cluster_test.go b/deps/github.com/arangodb/go-driver/test/cluster_test.go index 96af296a6..506fa99f6 100644 --- a/deps/github.com/arangodb/go-driver/test/cluster_test.go +++ b/deps/github.com/arangodb/go-driver/test/cluster_test.go @@ -119,6 +119,57 @@ func TestClusterDatabaseInventory(t *testing.T) { } } +// TestClusterDatabaseInventory tests the Cluster.DatabaseInventory method with satellite collections +func TestClusterDatabaseInventorySatellite(t *testing.T) { + skipNoEnterprise(t) + name := "satellite_collection_dbinv" + ctx := context.Background() + c := createClientFromEnv(t, true) + cl, err := c.Cluster(ctx) + if driver.IsPreconditionFailed(err) { + t.Skip("Not a cluster") + } else { + db, err := c.Database(ctx, "_system") + if err != nil { + t.Fatalf("Failed to open _system database: %s", describe(err)) + } + ensureCollection(ctx, db, name, &driver.CreateCollectionOptions{ + ReplicationFactor: driver.ReplicationFactorSatellite, + }, t) + h, err := cl.Health(ctx) + if err != nil { + t.Fatalf("Health failed: %s", describe(err)) + } + inv, err := cl.DatabaseInventory(ctx, db) + if err != nil { + t.Fatalf("DatabaseInventory failed: %s", describe(err)) + } + if len(inv.Collections) == 0 { + t.Error("Expected multiple collections, got 0") + } + foundSatellite := false + for _, col := range inv.Collections { + if len(col.Parameters.Shards) == 0 { + t.Errorf("Expected 1 or more shards in collection %s, got 0", col.Parameters.Name) + } + if col.Parameters.IsSatellite() { + foundSatellite = true + } + for shardID, dbServers := range col.Parameters.Shards { + for _, serverID := range dbServers { + if _, found := h.Health[serverID]; !found { + t.Errorf("Unexpected dbserver ID for shard '%s': %s", shardID, serverID) + } + } + } + } + + if !foundSatellite { + t.Errorf("No satellite collection.") + } + } +} + // TestClusterMoveShard tests the Cluster.MoveShard method. func TestClusterMoveShard(t *testing.T) { ctx := context.Background() diff --git a/deps/github.com/arangodb/go-driver/test/collection_test.go b/deps/github.com/arangodb/go-driver/test/collection_test.go index f2d6113cc..b5a8764c4 100644 --- a/deps/github.com/arangodb/go-driver/test/collection_test.go +++ b/deps/github.com/arangodb/go-driver/test/collection_test.go @@ -73,6 +73,44 @@ func TestCreateCollection(t *testing.T) { } } +// TestCreateSatelliteCollection create a satellite collection +func TestCreateSatelliteCollection(t *testing.T) { + skipNoEnterprise(t) + c := createClientFromEnv(t, true) + _, err := c.Cluster(nil) + if driver.IsPreconditionFailed(err) { + t.Skipf("Not a cluster") + } else if err != nil { + t.Fatalf("Failed to get cluster: %s", describe(err)) + } + db := ensureDatabase(nil, c, "collection_test", nil, t) + name := "test_create_collection_satellite" + options := driver.CreateCollectionOptions{ + ReplicationFactor: driver.ReplicationFactorSatellite, + } + if _, err := db.CreateCollection(nil, name, &options); err != nil { + t.Fatalf("Failed to create collection '%s': %s", name, describe(err)) + } + // Collection must exist now + if found, err := db.CollectionExists(nil, name); err != nil { + t.Errorf("CollectionExists('%s') failed: %s", name, describe(err)) + } else if !found { + t.Errorf("CollectionExists('%s') return false, expected true", name) + } + // Check if the collection is a satellite collection + if col, err := db.Collection(nil, name); err != nil { + t.Errorf("Collection('%s') failed: %s", name, describe(err)) + } else { + if prop, err := col.Properties(nil); err != nil { + t.Errorf("Properties() failed: %s", describe(err)) + } else { + if !prop.IsSatellite() { + t.Errorf("Collection %s is not satellite", name) + } + } + } +} + // TestRemoveCollection creates a collection and then removes it. func TestRemoveCollection(t *testing.T) { c := createClientFromEnv(t, true) @@ -344,6 +382,40 @@ func TestCollectionSetProperties(t *testing.T) { } } +func TestCollectionSetPropertiesSatellite(t *testing.T) { + skipNoEnterprise(t) + c := createClientFromEnv(t, true) + + // Test replication factor + if _, err := c.Cluster(nil); err == nil { + + db := ensureDatabase(nil, c, "collection_test_satellite", nil, t) + name := "test_collection_set_properties_sat" + col, err := db.CreateCollection(nil, name, &driver.CreateCollectionOptions{ReplicationFactor: driver.ReplicationFactorSatellite}) + if err != nil { + t.Fatalf("Failed to create collection '%s': %s", name, describe(err)) + } + + // Set ReplicationFactor to satellite (noop) + replFact := driver.ReplicationFactorSatellite + ctx := driver.WithEnforceReplicationFactor(context.Background(), false) + if err := col.SetProperties(ctx, driver.SetCollectionPropertiesOptions{ReplicationFactor: replFact}); err != nil { + t.Fatalf("Failed to set properties: %s", describe(err)) + } + if p, err := col.Properties(nil); err != nil { + t.Errorf("Failed to fetch collection properties: %s", describe(err)) + } else { + if p.ReplicationFactor != replFact { + t.Errorf("Expected ReplicationFactor %d, got %d", replFact, p.ReplicationFactor) + } + } + } else if driver.IsPreconditionFailed(err) { + t.Logf("ReplicationFactor tests skipped because we're not running in a cluster") + } else { + t.Errorf("Cluster failed: %s", describe(err)) + } +} + // TestCollectionRevision creates a collection, checks revision after adding documents. func TestCollectionRevision(t *testing.T) { c := createClientFromEnv(t, true) diff --git a/deps/github.com/arangodb/go-driver/test/user_auth_test.go b/deps/github.com/arangodb/go-driver/test/user_auth_test.go index e636c033d..9212a347e 100644 --- a/deps/github.com/arangodb/go-driver/test/user_auth_test.go +++ b/deps/github.com/arangodb/go-driver/test/user_auth_test.go @@ -34,6 +34,10 @@ import ( // TestUpdateUserPasswordMyself creates a user and tries to update the password of the authenticated user. func TestUpdateUserPasswordMyself(t *testing.T) { + // Disable those tests for active failover + if getTestMode() == testModeResilientSingle { + t.Skip("Disabled in active failover mode") + } var conn driver.Connection c := createClientFromEnv(t, true, &conn) version, err := c.Version(nil) @@ -68,6 +72,10 @@ func TestUpdateUserPasswordMyself(t *testing.T) { // TestUpdateUserPasswordOtherUser creates a user and tries to update the password of another user. func TestUpdateUserPasswordOtherUser(t *testing.T) { + // Disable those tests for active failover + if getTestMode() == testModeResilientSingle { + t.Skip("Disabled in active failover mode") + } var conn driver.Connection c := createClientFromEnv(t, true, &conn) version, err := c.Version(nil) @@ -119,6 +127,10 @@ func TestUpdateUserPasswordOtherUser(t *testing.T) { // TestGrantUserDatabase creates a user & database and granting the user access to the database. func TestGrantUserDatabase(t *testing.T) { + // Disable those tests for active failover + if getTestMode() == testModeResilientSingle { + t.Skip("Disabled in active failover mode") + } c := createClientFromEnv(t, true) version, err := c.Version(nil) if err != nil { @@ -200,6 +212,10 @@ func TestGrantUserDatabase(t *testing.T) { // TestGrantUserDefaultDatabase creates a user & database and granting the user access to the "default" database. func TestGrantUserDefaultDatabase(t *testing.T) { + // Disable those tests for active failover + if getTestMode() == testModeResilientSingle { + t.Skip("Disabled in active failover mode") + } c := createClientFromEnv(t, true) version, err := c.Version(nil) if err != nil { @@ -323,6 +339,10 @@ func TestGrantUserDefaultDatabase(t *testing.T) { // TestGrantUserCollection creates a user & database & collection and granting the user access to the collection. func TestGrantUserCollection(t *testing.T) { + // Disable those tests for active failover + if getTestMode() == testModeResilientSingle { + t.Skip("Disabled in active failover mode") + } c := createClientFromEnv(t, true) version, err := c.Version(nil) if err != nil { @@ -511,6 +531,10 @@ func TestGrantUserCollection(t *testing.T) { // TestUserAccessibleDatabases creates a user & databases and checks the list of accessible databases. func TestUserAccessibleDatabases(t *testing.T) { + // Disable those tests for active failover + if getTestMode() == testModeResilientSingle { + t.Skip("Disabled in active failover mode") + } c := createClientFromEnv(t, true) version, err := c.Version(nil) if err != nil { diff --git a/deps/github.com/arangodb/go-driver/test/util.go b/deps/github.com/arangodb/go-driver/test/util.go index 40ea6d031..c5621861a 100644 --- a/deps/github.com/arangodb/go-driver/test/util.go +++ b/deps/github.com/arangodb/go-driver/test/util.go @@ -97,3 +97,23 @@ func getIntFromEnv(envKey string, defaultValue int) int { } return defaultValue } + +const ( + testModeCluster = "cluster" + testModeResilientSingle = "resilientsingle" + testModeSingle = "single" +) + +func getTestMode() string { + + return strings.TrimSpace(os.Getenv("TEST_MODE")) +} + +func skipNoEnterprise(t *testing.T) { + c := createClientFromEnv(t, true) + if v, err := c.Version(nil); err != nil { + t.Errorf("Failed to get version: %s", describe(err)) + } else if !v.IsEnterprise() { + t.Skipf("Enterprise only") + } +} diff --git a/deps/github.com/arangodb/go-driver/view_arangosearch.go b/deps/github.com/arangodb/go-driver/view_arangosearch.go index de847f833..0586181b3 100644 --- a/deps/github.com/arangodb/go-driver/view_arangosearch.go +++ b/deps/github.com/arangodb/go-driver/view_arangosearch.go @@ -51,7 +51,7 @@ type ArangoSearchViewProperties struct { // For the case where the consolidation policies rarely merge segments // (i.e. few inserts/deletes), a higher value will impact performance // without any added benefits. - CleanupIntervalStep int64 `json:"cleanupIntervalStep,omitempty"` + CleanupIntervalStep *int64 `json:"cleanupIntervalStep,omitempty"` // ConsolidationInterval specifies the minimum number of milliseconds that must be waited // between committing index data changes and making them visible to queries. // Defaults to 60000. @@ -62,33 +62,73 @@ type ArangoSearchViewProperties struct { // For the case where there are a few inserts/updates, a higher value will // impact performance and waste disk space for each commit call without // any added benefits. - ConsolidationInterval int64 `json:"consolidationIntervalMsec,omitempty"` + ConsolidationInterval *int64 `json:"consolidationIntervalMsec,omitempty"` // ConsolidationPolicy specifies thresholds for consolidation. ConsolidationPolicy *ArangoSearchConsolidationPolicy `json:"consolidationPolicy,omitempty"` - /* - // Locale specifies the default locale used for queries on analyzed string values. - // Defaults to "C". TODO What is that? - Locale Locale `json:"locale,omitempty"` - */ + // WriteBufferIdel specifies the maximum number of writers (segments) cached in the pool. + // 0 value turns off caching, default value is 64. + WriteBufferIdel *int64 `json:"writebufferIdle,omitempty"` + + // WriteBufferActive specifies the maximum number of concurrent active writers (segments) performs (a transaction). + // Other writers (segments) are wait till current active writers (segments) finish. + // 0 value turns off this limit and used by default. + WriteBufferActive *int64 `json:"writebufferActive,omitempty"` + + // WriteBufferSizeMax specifies maximum memory byte size per writer (segment) before a writer (segment) flush is triggered. + // 0 value turns off this limit fon any writer (buffer) and will be flushed only after a period defined for special thread during ArangoDB server startup. + // 0 value should be used with carefully due to high potential memory consumption. + WriteBufferSizeMax *int64 `json:"writebufferSizeMax,omitempty"` + // Links contains the properties for how individual collections // are indexed in thie view. // The key of the map are collection names. Links ArangoSearchLinks `json:"links,omitempty"` } -// Locale is a strongly typed specifier of a locale. -// TODO specify semantics. -type Locale string +// ArangoSearchConsolidationPolicyType strings for consolidation types +type ArangoSearchConsolidationPolicyType string + +const ( + // ArangoSearchConsolidationPolicyTypeTier consolidate based on segment byte size and live document count as dictated by the customization attributes. + ArangoSearchConsolidationPolicyTypeTier ArangoSearchConsolidationPolicyType = "tier" + // ArangoSearchConsolidationPolicyTypeBytesAccum consolidate if and only if ({threshold} range [0.0, 1.0]) + // {threshold} > (segment_bytes + sum_of_merge_candidate_segment_bytes) / all_segment_bytes, + // i.e. the sum of all candidate segment's byte size is less than the total segment byte size multiplied by the {threshold}. + ArangoSearchConsolidationPolicyTypeBytesAccum ArangoSearchConsolidationPolicyType = "bytes_accum" +) // ArangoSearchConsolidationPolicy holds threshold values specifying when to // consolidate view data. // Semantics of the values depend on where they are used. type ArangoSearchConsolidationPolicy struct { - // Threshold is a percentage (0..1) - Threshold float64 `json:"threshold,omitempty"` - // SegmentThreshold is an absolute value. - SegmentThreshold int64 `json:"segmentThreshold,omitempty"` + // Type returns the type of the ConsolidationPolicy. This interface can then be casted to the corresponding ArangoSearchConsolidationPolicy* struct. + Type ArangoSearchConsolidationPolicyType `json:"type,omitempty"` + + ArangoSearchConsolidationPolicyBytesAccum + ArangoSearchConsolidationPolicyTier +} + +// ArangoSearchConsolidationPolicyBytesAccum contains fields used for ArangoSearchConsolidationPolicyTypeBytesAccum +type ArangoSearchConsolidationPolicyBytesAccum struct { + // Threshold, see ArangoSearchConsolidationTypeBytesAccum + Threshold *float64 `json:"threshold,omitempty"` +} + +// ArangoSearchConsolidationPolicyTier contains fields used for ArangoSearchConsolidationPolicyTypeTier +type ArangoSearchConsolidationPolicyTier struct { + // MinSegments specifies the minimum number of segments that will be evaluated as candidates for consolidation. + MinSegments *int64 `json:"minSegments,omitempty"` + // MaxSegments specifies the maximum number of segments that will be evaluated as candidates for consolidation. + MaxSegments *int64 `json:"maxSegments,omitempty"` + // SegmentsBytesMax specifies the maxinum allowed size of all consolidated segments in bytes. + SegmentsBytesMax *int64 `json:"segmentsBytesMax,omitempty"` + // SegmentsBytesFloor defines the value (in bytes) to treat all smaller segments as equal for consolidation selection. + SegmentsBytesFloor *int64 `json:"segmentsBytesFloor,omitempty"` + // Lookahead specifies the number of additionally searched tiers except initially chosen candidated based on min_segments, + // max_segments, segments_bytes_max, segments_bytes_floor with respect to defined values. + // Default value falls to integer_traits::const_max (in C++ source code). + Lookahead *int64 `json:"lookahead,omitempty"` } // ArangoSearchLinks is a strongly typed map containing links between a diff --git a/pkg/apis/deployment/v1alpha/conditions.go b/pkg/apis/deployment/v1alpha/conditions.go index c15698ad6..165f23972 100644 --- a/pkg/apis/deployment/v1alpha/conditions.go +++ b/pkg/apis/deployment/v1alpha/conditions.go @@ -53,6 +53,8 @@ const ( ConditionTypeSecretsChanged ConditionType = "SecretsChanged" // ConditionTypeMemberOfCluster indicates that the member is a known member of the ArangoDB cluster. ConditionTypeMemberOfCluster ConditionType = "MemberOfCluster" + // ConditionTypeTerminating indicates that the member is terminating but not yet terminated. + ConditionTypeTerminating ConditionType = "Terminating" ) // Condition represents one current condition of a deployment or deployment member. diff --git a/pkg/deployment/reconcile/action_remove_member.go b/pkg/deployment/reconcile/action_remove_member.go index 08df4bba0..70e99f57e 100644 --- a/pkg/deployment/reconcile/action_remove_member.go +++ b/pkg/deployment/reconcile/action_remove_member.go @@ -81,10 +81,16 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) { } // We don't care if not found if record, ok := health.Health[driver.ServerID(m.ID)]; ok { - if record.Status != driver.ServerStatusFailed { - return false, maskAny(fmt.Errorf("can not remove server from cluster. Not yet terminated. Retry later")) + + // Check if the pod is terminating + if m.Conditions.IsTrue(api.ConditionTypeTerminating) { + + if record.Status != driver.ServerStatusFailed { + return false, maskAny(fmt.Errorf("can not remove server from cluster. Not yet terminated. Retry later")) + } + + a.log.Debug().Msg("dbserver has shut down") } - a.log.Warn().Msg("dbserver is failed but still in use") } } else { a.log.Warn().Msgf("ignoring error: %s", err.Error()) diff --git a/pkg/deployment/reconcile/action_wait_for_member_up.go b/pkg/deployment/reconcile/action_wait_for_member_up.go index 6466758a8..71c98a369 100644 --- a/pkg/deployment/reconcile/action_wait_for_member_up.go +++ b/pkg/deployment/reconcile/action_wait_for_member_up.go @@ -167,6 +167,26 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, log.Debug().Str("status", string(sh.Status)).Msg("Member set status not yet good") return false, false, nil } + if a.action.Group == api.ServerGroupDBServers { + dbs, err := c.Databases(ctx) + if err != nil { + return false, false, err + } + for _, db := range dbs { + inv, err := cluster.DatabaseInventory(ctx, db) + if err != nil { + return false, false, err + } + + for _, col := range inv.Collections { + if !col.AllInSync { + log.Debug().Str("col", col.Parameters.Name).Msg("Not in sync") + return false, false, nil + } + } + } + + } // Wait for the member to become ready from a kubernetes point of view // otherwise the coordinators may be rotated to fast and thus none of them // is ready resulting in a short downtime diff --git a/pkg/deployment/reconcile/plan_builder.go b/pkg/deployment/reconcile/plan_builder.go index 1904c9b25..a79534346 100644 --- a/pkg/deployment/reconcile/plan_builder.go +++ b/pkg/deployment/reconcile/plan_builder.go @@ -180,7 +180,7 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, if podName := m.PodName; podName != "" { if p := getPod(podName); p != nil { // Got pod, compare it with what it should be - decision := podNeedsUpgrading(*p, spec, status.Images) + decision := podNeedsUpgrading(log, *p, spec, status.Images) if decision.UpgradeNeeded && !decision.UpgradeAllowed { // Oops, upgrade is not allowed upgradeNotAllowed = true @@ -193,7 +193,8 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, // Only rotate/upgrade 1 pod at a time if decision.UpgradeNeeded { // Yes, upgrade is needed (and allowed) - newPlan = createUpgradeMemberPlan(log, m, group, "Version upgrade", spec.GetImage(), status) + newPlan = createUpgradeMemberPlan(log, m, group, "Version upgrade", spec.GetImage(), status, !decision.AutoUpgradeNeeded) + } else { // Upgrade is not needed, see if rotation is needed if rotNeeded, reason := podNeedsRotation(log, *p, apiObject, spec, group, status, m.ID, context); rotNeeded { @@ -238,7 +239,7 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject, // podNeedsUpgrading decides if an upgrade of the pod is needed (to comply with // the given spec) and if that is allowed. -func podNeedsUpgrading(p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoList) upgradeDecision { +func podNeedsUpgrading(log zerolog.Logger, p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoList) upgradeDecision { if c, found := k8sutil.GetContainerByName(&p, k8sutil.ServerContainerName); found { specImageInfo, found := images.GetByImage(spec.GetImage()) if !found { @@ -276,6 +277,10 @@ func podNeedsUpgrading(p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoLi } if specVersion.Major() != podVersion.Major() || specVersion.Minor() != podVersion.Minor() { // Is allowed, with `--database.auto-upgrade` + log.Info().Str("spec-version", string(specVersion)).Str("pod-version", string(podVersion)). + Int("spec-version.major", specVersion.Major()).Int("spec-version.minor", specVersion.Minor()). + Int("pod-version.major", podVersion.Major()).Int("pod-version.minor", podVersion.Minor()). + Str("pod", p.GetName()).Msg("Deciding to do a upgrade with --auto-upgrade") return upgradeDecision{ FromVersion: podVersion, FromLicense: podLicense, @@ -415,9 +420,9 @@ func createRotateMemberPlan(log zerolog.Logger, member api.MemberStatus, // createUpgradeMemberPlan creates a plan to upgrade (stop-recreateWithAutoUpgrade-stop-start) an existing // member. func createUpgradeMemberPlan(log zerolog.Logger, member api.MemberStatus, - group api.ServerGroup, reason string, imageName string, status api.DeploymentStatus) api.Plan { + group api.ServerGroup, reason string, imageName string, status api.DeploymentStatus, rotateStatefull bool) api.Plan { upgradeAction := api.ActionTypeUpgradeMember - if group.IsStateless() { + if rotateStatefull || group.IsStateless() { upgradeAction = api.ActionTypeRotateMember } log.Debug(). diff --git a/pkg/deployment/resources/pod_creator.go b/pkg/deployment/resources/pod_creator.go index 3f3542629..406ccfe90 100644 --- a/pkg/deployment/resources/pod_creator.go +++ b/pkg/deployment/resources/pod_creator.go @@ -674,6 +674,7 @@ func (r *Resources) createPodForMember(spec api.DeploymentSpec, memberID string, m.Phase = newPhase m.Conditions.Remove(api.ConditionTypeReady) m.Conditions.Remove(api.ConditionTypeTerminated) + m.Conditions.Remove(api.ConditionTypeTerminating) m.Conditions.Remove(api.ConditionTypeAgentRecoveryNeeded) m.Conditions.Remove(api.ConditionTypeAutoUpgrade) if err := status.Members.Update(m, group); err != nil { diff --git a/pkg/deployment/resources/pod_inspector.go b/pkg/deployment/resources/pod_inspector.go index 822852ae9..207c37e62 100644 --- a/pkg/deployment/resources/pod_inspector.go +++ b/pkg/deployment/resources/pod_inspector.go @@ -149,6 +149,10 @@ func (r *Resources) InspectPods(ctx context.Context) (util.Interval, error) { unscheduledPodNames = append(unscheduledPodNames, p.GetName()) } if k8sutil.IsPodMarkedForDeletion(&p) { + if memberStatus.Conditions.Update(api.ConditionTypeTerminating, true, "Pod marked for deletion", "") { + updateMemberStatusNeeded = true + log.Debug().Str("pod-name", p.GetName()).Msg("Pod marked as terminating") + } // Process finalizers if x, err := r.runPodFinalizers(ctx, &p, memberStatus, func(m api.MemberStatus) error { updateMemberStatusNeeded = true diff --git a/pkg/deployment/resources/pod_termination.go b/pkg/deployment/resources/pod_termination.go index 8e065681a..98c207569 100644 --- a/pkg/deployment/resources/pod_termination.go +++ b/pkg/deployment/resources/pod_termination.go @@ -161,7 +161,7 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, log zerol } // Once decided to drain the member, never go back - if memberStatus.Phase == api.MemberPhaseCreated { + if memberStatus.Phase == api.MemberPhaseDrain { dbserverDataWillBeGone = true }