Skip to content

Commit

Permalink
Allocated GameServers updated on Fleet update (#3101)
Browse files Browse the repository at this point in the history
* Allocated GameServers updated on Fleet update

Functional implementation and testing of applying labels and/or
annotations to any `Allocated` `GameServers` that are overflowed past
the configured replica count.

Next ➡️ write some guides to close out the ticket.

Work on #2682

* Review updates.
  • Loading branch information
markmandel committed Apr 22, 2023
1 parent 0064c14 commit 0fc3532
Show file tree
Hide file tree
Showing 12 changed files with 3,650 additions and 2,886 deletions.
11 changes: 11 additions & 0 deletions examples/fleet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ spec:
maxSurge: 25%
# the amount to decrements GameServers by. Defaults to 25%
maxUnavailable: 25%
# [Stage:Alpha]
# [FeatureFlag:FleetAllocationOverflow]
# Labels and/or Annotations to apply to overflowing GameServers when the number of Allocated GameServers is more
# than the desired replicas on the underlying `GameServerSet`
# Commented out since Alpha, and disabled by default
# allocationOverflow:
# labels:
# mykey: myvalue
# version: "" # empty an existing label value
# annotations:
# otherkey: setthisvalue
template:
# GameServer metadata
metadata:
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/agones/v1/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type FleetSpec struct {
Replicas int32 `json:"replicas"`
// [Stage: Alpha]
// [FeatureFlag:FleetAllocationOverflow]
// Labels and Annotations to apply to GameServers when the number of Allocated GameServers drops below
// the desired replicas on the underlying `GameServerSet`
// Labels and/or Annotations to apply to overflowing GameServers when the number of Allocated GameServers is more
// than the desired replicas on the underlying `GameServerSet`
// +optional
AllocationOverflow *AllocationOverflow `json:"allocationOverflow,omitempty"`
// Deployment strategy
Expand Down
162 changes: 162 additions & 0 deletions pkg/gameserversets/allocation_overflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2023 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gameserversets

import (
"context"
"time"

"agones.dev/agones/pkg/apis/agones"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
"agones.dev/agones/pkg/client/clientset/versioned"
getterv1 "agones.dev/agones/pkg/client/clientset/versioned/typed/agones/v1"
"agones.dev/agones/pkg/client/informers/externalversions"
listerv1 "agones.dev/agones/pkg/client/listers/agones/v1"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/logfields"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/workerqueue"
"github.com/heptiolabs/healthcheck"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)

// AllocationOverflowController watches `GameServerSets`, and those with configured
// AllocationOverflow settings, will the relevant labels and annotations to `GameServers` attached to the given
// `GameServerSet`
type AllocationOverflowController struct {
baseLogger *logrus.Entry
counter *gameservers.PerNodeCounter
gameServerSynced cache.InformerSynced
gameServerGetter getterv1.GameServersGetter
gameServerLister listerv1.GameServerLister
gameServerSetSynced cache.InformerSynced
gameServerSetLister listerv1.GameServerSetLister
workerqueue *workerqueue.WorkerQueue
}

// NewAllocatorOverflowController returns a new AllocationOverflowController
func NewAllocatorOverflowController(
health healthcheck.Handler,
counter *gameservers.PerNodeCounter,
agonesClient versioned.Interface,
agonesInformerFactory externalversions.SharedInformerFactory) *AllocationOverflowController {
gameServers := agonesInformerFactory.Agones().V1().GameServers()
gameServerSet := agonesInformerFactory.Agones().V1().GameServerSets()
gsSetInformer := gameServerSet.Informer()

c := &AllocationOverflowController{
counter: counter,
gameServerSynced: gameServers.Informer().HasSynced,
gameServerGetter: agonesClient.AgonesV1(),
gameServerLister: gameServers.Lister(),
gameServerSetSynced: gsSetInformer.HasSynced,
gameServerSetLister: gameServerSet.Lister(),
}

c.baseLogger = runtime.NewLoggerWithType(c)
c.baseLogger.Debug("Created!")
c.workerqueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServerSet, c.baseLogger, logfields.GameServerSetKey, agones.GroupName+".GameServerSetController", workerqueue.FastRateLimiter(3*time.Second))
health.AddLivenessCheck("gameserverset-allocationoverflow-workerqueue", c.workerqueue.Healthy)

gsSetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
newGss := newObj.(*agonesv1.GameServerSet)

// Only process if there is an AllocationOverflow, and it has labels or annotations.
if newGss.Spec.AllocationOverflow == nil {
return
} else if len(newGss.Spec.AllocationOverflow.Labels) == 0 && len(newGss.Spec.AllocationOverflow.Annotations) == 0 {
return
}
if newGss.Status.AllocatedReplicas > newGss.Spec.Replicas {
c.workerqueue.Enqueue(newGss)
}
},
})

return c
}

// Run this controller. Will block until stop is closed.
func (c *AllocationOverflowController) Run(ctx context.Context) error {
c.baseLogger.Debug("Wait for cache sync")
if !cache.WaitForCacheSync(ctx.Done(), c.gameServerSynced, c.gameServerSetSynced) {
return errors.New("failed to wait for caches to sync")
}

c.workerqueue.Run(ctx, 1)
return nil
}

// syncGameServerSet checks to see if there are overflow Allocated GameServers and applied the labels and/or
// annotations to the requisite number of GameServers needed to alert the underlying system.
func (c *AllocationOverflowController) syncGameServerSet(ctx context.Context, key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
// don't return an error, as we don't want this retried
runtime.HandleError(loggerForGameServerSetKey(c.baseLogger, key), errors.Wrapf(err, "invalid resource key"))
return nil
}

gsSet, err := c.gameServerSetLister.GameServerSets(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
loggerForGameServerSetKey(c.baseLogger, key).Debug("GameServerSet is no longer available for syncing")
return nil
}
return errors.Wrapf(err, "error retrieving GameServerSet %s from namespace %s", name, namespace)
}

// just in case something changed, double check to avoid panics and/or sending work to the K8s API that we don't
// need to
if gsSet.Spec.AllocationOverflow == nil {
return nil
}
if gsSet.Status.AllocatedReplicas <= gsSet.Spec.Replicas {
return nil
}

overflow := gsSet.Status.AllocatedReplicas - gsSet.Spec.Replicas

list, err := ListGameServersByGameServerSetOwner(c.gameServerLister, gsSet)
if err != nil {
return err
}

matches, rest := gsSet.Spec.AllocationOverflow.CountMatches(list)
if matches >= overflow {
return nil
}

rest = sortGameServersByStrategy(gsSet.Spec.Scheduling, rest, c.counter.Counts())
rest = rest[:(overflow - matches)]

opts := v1.UpdateOptions{}
for _, gs := range rest {
gsCopy := gs.DeepCopy()
gsSet.Spec.AllocationOverflow.Apply(gsCopy)

if _, err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(ctx, gsCopy, opts); err != nil {
return errors.Wrapf(err, "error updating GameServer %s with overflow labels and/or annotations", gs.ObjectMeta.Name)
}
}

return nil
}
203 changes: 203 additions & 0 deletions pkg/gameserversets/allocation_overflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2023 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gameserversets

import (
"context"
"fmt"
"testing"
"time"

agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
"agones.dev/agones/pkg/gameservers"
agtesting "agones.dev/agones/pkg/testing"
"github.com/heptiolabs/healthcheck"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
k8stesting "k8s.io/client-go/testing"
)

func TestAllocationOverflowControllerWatchGameServers(t *testing.T) {
t.Parallel()

gsSet := defaultFixture()
gsSet.Status.Replicas = gsSet.Spec.Replicas
gsSet.Status.ReadyReplicas = gsSet.Spec.Replicas
c, m := newFakeAllocationOverflowController()

received := make(chan string, 10)
defer close(received)

gsSetWatch := watch.NewFake()
m.AgonesClient.AddWatchReactor("gameserversets", k8stesting.DefaultWatchReactor(gsSetWatch, nil))

c.workerqueue.SyncHandler = func(_ context.Context, name string) error {
received <- name
return nil
}

ctx, cancel := agtesting.StartInformers(m, c.gameServerSetSynced)
defer cancel()

go func() {
err := c.Run(ctx)
require.NoError(t, err)
}()

change := func() string {
select {
case result := <-received:
return result
case <-time.After(3 * time.Second):
require.FailNow(t, "timeout occurred")
}
return ""
}

nochange := func() {
select {
case <-received:
assert.Fail(t, "Should be no value")
case <-time.After(time.Second):
}
}

gsSetWatch.Add(gsSet.DeepCopy())
nochange()

// update with no allocation overflow
require.Nil(t, gsSet.Spec.AllocationOverflow)
gsSet.Spec.Replicas++
gsSetWatch.Modify(gsSet.DeepCopy())
nochange()

// update with no labels or annotations
gsSet.Spec.AllocationOverflow = &agonesv1.AllocationOverflow{}
gsSet.Spec.Replicas++
gsSetWatch.Modify(gsSet.DeepCopy())
nochange()

// update with allocation <= replicas (and a label)
gsSet.Spec.AllocationOverflow.Labels = map[string]string{"colour": "green"}
gsSet.Status.AllocatedReplicas = 2
gsSetWatch.Modify(gsSet.DeepCopy())
nochange()

// update with allocation > replicas
gsSet.Status.AllocatedReplicas = 20
gsSetWatch.Modify(gsSet.DeepCopy())
require.Equal(t, fmt.Sprintf("%s/%s", gsSet.ObjectMeta.Namespace, gsSet.ObjectMeta.Name), change())

// delete
gsSetWatch.Delete(gsSet.DeepCopy())
nochange()
}

func TestAllocationOverflowSyncGameServerSet(t *testing.T) {
t.Parallel()

// setup fictures.
setup := func(gs func(server *agonesv1.GameServer)) (*agonesv1.GameServerSet, *AllocationOverflowController, agtesting.Mocks) {
gsSet := defaultFixture()
gsSet.Status.AllocatedReplicas = 5
gsSet.Status.Replicas = 3
gsSet.Spec.Replicas = 3
gsSet.Spec.AllocationOverflow = &agonesv1.AllocationOverflow{Labels: map[string]string{"colour": "green"}}
list := createGameServers(gsSet, 5)
for i := range list {
list[i].Status.State = agonesv1.GameServerStateAllocated
gs(&list[i])
}

c, m := newFakeAllocationOverflowController()
m.AgonesClient.AddReactor("list", "gameserversets", func(action k8stesting.Action) (bool, runtime.Object, error) {
return true, &agonesv1.GameServerSetList{Items: []agonesv1.GameServerSet{*gsSet}}, nil
})
m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) {
return true, &agonesv1.GameServerList{Items: list}, nil
})
return gsSet, c, m
}

// run the sync process
run := func(c *AllocationOverflowController, m agtesting.Mocks, gsSet *agonesv1.GameServerSet, update func(action k8stesting.Action) (bool, runtime.Object, error)) func() {
m.AgonesClient.AddReactor("update", "gameservers", update)
ctx, cancel := agtesting.StartInformers(m, c.gameServerSetSynced, c.gameServerSynced)
err := c.syncGameServerSet(ctx, gsSet.ObjectMeta.Namespace+"/"+gsSet.ObjectMeta.Name)
require.NoError(t, err)
return cancel
}

t.Run("labels are applied", func(t *testing.T) {
gsSet, c, m := setup(func(_ *agonesv1.GameServer) {})
count := 0
cancel := run(c, m, gsSet, func(action k8stesting.Action) (bool, runtime.Object, error) {
ua := action.(k8stesting.UpdateAction)
gs := ua.GetObject().(*agonesv1.GameServer)
require.Equal(t, gs.Status.State, agonesv1.GameServerStateAllocated)
require.Equal(t, "green", gs.ObjectMeta.Labels["colour"])

count++
return true, nil, nil
})
defer cancel()
require.Equal(t, 2, count)
})

t.Run("Labels are already set", func(t *testing.T) {
gsSet, c, m := setup(func(gs *agonesv1.GameServer) {
gs.ObjectMeta.Labels["colour"] = "green"
})
cancel := run(c, m, gsSet, func(action k8stesting.Action) (bool, runtime.Object, error) {
require.Fail(t, "should not update")
return true, nil, nil
})
defer cancel()
})

t.Run("one label is set", func(t *testing.T) {
set := false
gsSet, c, m := setup(func(gs *agonesv1.GameServer) {
// just make one as already set
if !set {
gs.ObjectMeta.Labels["colour"] = "green"
set = true
}
})

count := 0
cancel := run(c, m, gsSet, func(action k8stesting.Action) (bool, runtime.Object, error) {
ua := action.(k8stesting.UpdateAction)
gs := ua.GetObject().(*agonesv1.GameServer)
require.Equal(t, gs.Status.State, agonesv1.GameServerStateAllocated)
require.Equal(t, "green", gs.ObjectMeta.Labels["colour"])

count++
return true, nil, nil
})
defer cancel()
require.Equal(t, 1, count)
})
}

// newFakeAllocationOverflowController returns a controller, backed by the fake Clientset
func newFakeAllocationOverflowController() (*AllocationOverflowController, agtesting.Mocks) {
m := agtesting.NewMocks()
counter := gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory)
c := NewAllocatorOverflowController(healthcheck.NewHandler(), counter, m.AgonesClient, m.AgonesInformerFactory)
return c, m
}

0 comments on commit 0fc3532

Please sign in to comment.