Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reserve() SDK implementation #891

Merged
merged 2 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 78 additions & 60 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package sdkserver

import (
"fmt"
"io"
"net/http"
"strings"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/clock"
Expand All @@ -56,17 +56,7 @@ const (
updateAnnotation Operation = "updateAnnotation"
)

var (
_ sdk.SDKServer = &SDKServer{}

defaultTimeout = 30 * time.Second
defaultBackoff = wait.Backoff{
Duration: 100 * time.Millisecond,
Factor: 1,
Jitter: 0.1,
Steps: 5,
}
)
var _ sdk.SDKServer = &SDKServer{}

// SDKServer is a gRPC server, that is meant to be a sidecar
// for a GameServer that will update the game server status on SDK requests
Expand Down Expand Up @@ -96,6 +86,8 @@ type SDKServer struct {
gsState stablev1alpha1.GameServerState
gsUpdateMutex sync.RWMutex
gsWaitForSync sync.WaitGroup
reserveTimer *time.Timer
gsReserveDuration *time.Duration
}

// NewSDKServer creates a SDKServer that sets up an
Expand Down Expand Up @@ -205,6 +197,12 @@ func (s *SDKServer) Run(stop <-chan struct{}) error {
s.healthTimeout = time.Duration(gs.Spec.Health.PeriodSeconds) * time.Second
s.initHealthLastUpdated(time.Duration(gs.Spec.Health.InitialDelaySeconds) * time.Second)

if gs.Status.State == stablev1alpha1.GameServerStateReserved && gs.Status.ReservedUntil != nil {
s.gsUpdateMutex.Lock()
s.resetReserveAfter(context.Background(), time.Until(gs.Status.ReservedUntil.Time))
s.gsUpdateMutex.Unlock()
}

// start health checking running
if !s.health.Disabled {
s.logger.Info("Starting GameServer health checking")
Expand Down Expand Up @@ -261,35 +259,51 @@ func (s *SDKServer) updateState() error {
return err
}

// if we are currently in shutdown/being deleted, there is no escaping
// If we are currently in shutdown/being deleted, there is no escaping.
if gs.IsBeingDeleted() {
s.logger.Info("GameServerState being shutdown. Skipping update.")
return nil
}

// if the state is currently unhealthy, you can't go back to Ready
// If the state is currently unhealthy, you can't go back to Ready.
if gs.Status.State == stablev1alpha1.GameServerStateUnhealthy {
s.logger.Info("GameServerState already unhealthy. Skipping update.")
return nil
}

s.gsUpdateMutex.RLock()
gs.Status.State = s.gsState

// If we are setting the Reserved status, check for the duration, and set that too.
if gs.Status.State == stablev1alpha1.GameServerStateReserved && s.gsReserveDuration != nil {
n := metav1.NewTime(time.Now().Add(*s.gsReserveDuration))
gs.Status.ReservedUntil = &n
} else {
gs.Status.ReservedUntil = nil
}
s.gsUpdateMutex.RUnlock()

_, err = gameServers.Update(gs)
if err != nil {
return errors.Wrapf(err, "could not update GameServer %s/%s to state %s", s.namespace, s.gameServerName, gs.Status.State)
}

message := "SDK state change"
level := corev1.EventTypeNormal
// post state specific work here
switch gs.Status.State {
case stablev1alpha1.GameServerStateUnhealthy:
level = corev1.EventTypeWarning
case stablev1alpha1.GameServerStateReserved:
s.gsUpdateMutex.Lock()
if s.gsReserveDuration != nil {
message += fmt.Sprintf(", for %s", s.gsReserveDuration)
s.resetReserveAfter(context.Background(), *s.gsReserveDuration)
}
s.gsUpdateMutex.Unlock()
}

s.recorder.Event(gs, level, string(gs.Status.State), "SDK state change")
s.recorder.Event(gs, level, string(gs.Status.State), message)

return nil
}
Expand Down Expand Up @@ -363,58 +377,23 @@ func (s *SDKServer) enqueueState(state stablev1alpha1.GameServerState) {
// the workqueue so it can be updated
func (s *SDKServer) Ready(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) {
s.logger.Info("Received Ready request, adding to queue")
s.stopReserveTimer()
s.enqueueState(stablev1alpha1.GameServerStateRequestReady)
return e, nil
}

// Allocate set the GameServer to Allocate, as longs as it's not in UnHealthy,
// Shutdown or has a DeletionTimeStamp(). Times out after 30 seconds if it cannot
// complete the operation due to contention issues.
func (s *SDKServer) Allocate(context.Context, *sdk.Empty) (*sdk.Empty, error) {
s.logger.Info("Received self Allocate request")

now := s.clock.Now()
err := wait.ExponentialBackoff(defaultBackoff, func() (done bool, err error) {
gs, err := s.gameServer()
if err != nil {
return true, err
}

if !gs.ObjectMeta.DeletionTimestamp.IsZero() {
return true, nil
}

switch gs.Status.State {
case stablev1alpha1.GameServerStateUnhealthy:
return true, errors.New("cannot Allocate an Unhealthy GameServer")

case stablev1alpha1.GameServerStateShutdown:
return true, errors.New("cannot Allocate a Shutdown GameServer")
}

gsCopy := gs.DeepCopy()
gsCopy.Status.State = stablev1alpha1.GameServerStateAllocated
_, err = s.gameServerGetter.GameServers(s.namespace).Update(gsCopy)

// if a contention, and we are under the timeout period.
if k8serrors.IsConflict(err) {
if s.clock.Since(now) > defaultTimeout {
return true, errors.New("Allocation request timed out")
}

return false, nil
}

return true, errors.Wrap(err, "could not update gameserver to Allocated")
})

return &sdk.Empty{}, errors.WithStack(err)
// Allocate enters an Allocate state change into the workqueue, so it can be updated
func (s *SDKServer) Allocate(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) {
s.stopReserveTimer()
s.enqueueState(stablev1alpha1.GameServerStateAllocated)
return e, nil
}

// Shutdown enters the Shutdown state change for this GameServer into
// the workqueue so it can be updated
func (s *SDKServer) Shutdown(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) {
s.logger.Info("Received Shutdown request, adding to queue")
s.stopReserveTimer()
s.enqueueState(stablev1alpha1.GameServerStateShutdown)
return e, nil
}
Expand Down Expand Up @@ -486,9 +465,48 @@ func (s *SDKServer) WatchGameServer(_ *sdk.Empty, stream sdk.SDK_WatchGameServer
}

// Reserve moves this GameServer to the Reserved state for the Duration specified
// TODO: implement this functionality
func (s *SDKServer) Reserve(_ context.Context, d *sdk.Duration) (*sdk.Empty, error) {
return &sdk.Empty{}, errors.New("not implemented")
func (s *SDKServer) Reserve(ctx context.Context, d *sdk.Duration) (*sdk.Empty, error) {
s.stopReserveTimer()

e := &sdk.Empty{}

// 0 is forever.
if d.Seconds > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check actually means that 0 or a negative value means forever.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was deliberate. I wanted to handle the error condition in a sane way.

Do you feel I should change the docs?

duration := time.Duration(d.Seconds) * time.Second
s.gsUpdateMutex.Lock()
s.gsReserveDuration = &duration
s.gsUpdateMutex.Unlock()
}

s.logger.Info("Received Reserve request, adding to queue")
s.enqueueState(stablev1alpha1.GameServerStateReserved)

return e, nil
}

// resetReserveAfter will move the GameServer back to being ready after the specified duration.
// This function should be wrapped in a s.gsUpdateMutex lock when being called.
func (s *SDKServer) resetReserveAfter(ctx context.Context, duration time.Duration) {
if s.reserveTimer != nil {
s.reserveTimer.Stop()
}

s.reserveTimer = time.AfterFunc(duration, func() {
if _, err := s.Ready(ctx, &sdk.Empty{}); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("error returning to Ready after reserved")
}
})
}

// stopReserveTimer stops the reserve timer. This is a no-op and safe to call if the timer is nil
func (s *SDKServer) stopReserveTimer() {
s.gsUpdateMutex.Lock()
defer s.gsUpdateMutex.Unlock()

if s.reserveTimer != nil {
s.reserveTimer.Stop()
}
s.gsReserveDuration = nil
}

// sendGameServerUpdate sends a watch game server event
Expand Down
Loading