Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDKServer Player Tracking implementation #1507

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
145 changes: 130 additions & 15 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/clock"
Expand All @@ -53,10 +54,12 @@ import (
type Operation string

const (
updateState Operation = "updateState"
updateLabel Operation = "updateLabel"
updateAnnotation Operation = "updateAnnotation"
updatePlayerCapacity Operation = "updatePlayerCapacity"
updateState Operation = "updateState"
updateLabel Operation = "updateLabel"
updateAnnotation Operation = "updateAnnotation"
updatePlayerCapacity Operation = "updatePlayerCapacity"
updateConnectedPlayers Operation = "updateConnectedPlayers"
playerCountUpdatePeriod time.Duration = time.Second
)

var (
Expand Down Expand Up @@ -96,6 +99,7 @@ type SDKServer struct {
reserveTimer *time.Timer
gsReserveDuration *time.Duration
gsPlayerCapacity int64
gsConnectedPlayers []string
}

// NewSDKServer creates a SDKServer that sets up an
Expand Down Expand Up @@ -129,6 +133,7 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
gsAnnotations: map[string]string{},
gsUpdateMutex: sync.RWMutex{},
gsWaitForSync: sync.WaitGroup{},
gsConnectedPlayers: []string{},
}

s.informerFactory = factory
Expand Down Expand Up @@ -230,11 +235,12 @@ func (s *SDKServer) Run(stop <-chan struct{}) error {
go wait.Until(s.runHealth, s.healthTimeout, stop)
}

// populate player capacity value
// populate player tracking values
if runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
s.gsUpdateMutex.Lock()
if gs.Status.Players != nil {
s.gsPlayerCapacity = gs.Status.Players.Capacity
s.gsConnectedPlayers = gs.Status.Players.IDs
}
s.gsUpdateMutex.Unlock()
}
Expand Down Expand Up @@ -272,6 +278,8 @@ func (s *SDKServer) syncGameServer(key string) error {
return s.updateAnnotations()
case updatePlayerCapacity:
return s.updatePlayerCapacity()
case updateConnectedPlayers:
return s.updateConnectedPlayers()
}

return errors.Errorf("could not sync game server key: %s", key)
Expand Down Expand Up @@ -547,43 +555,117 @@ func (s *SDKServer) stopReserveTimer() {
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) PlayerConnect(ctx context.Context, id *alpha.PlayerID) (*alpha.Bool, error) {
panic("implement me")
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.logger.WithField("PlayerId", id.PlayerID).Debug("Player Connected")

s.gsUpdateMutex.Lock()
defer s.gsUpdateMutex.Unlock()

// the player is already connected, return false.
for _, playerID := range s.gsConnectedPlayers {
if playerID == id.PlayerID {
return &alpha.Bool{Bool: false}, nil
}
}

if int64(len(s.gsConnectedPlayers)) >= s.gsPlayerCapacity {
return &alpha.Bool{}, errors.New("players are already at capacity")
}

// let's retain the original order, as it should be a smaller patch on data change
s.gsConnectedPlayers = append(s.gsConnectedPlayers, id.PlayerID)
s.workerqueue.EnqueueAfter(cache.ExplicitKey(string(updateConnectedPlayers)), playerCountUpdatePeriod)

return &alpha.Bool{Bool: true}, nil
}

// PlayerDisconnect should be called when a player disconnects.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) PlayerDisconnect(ctx context.Context, id *alpha.PlayerID) (*alpha.Bool, error) {
panic("implement me")
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.logger.WithField("PlayerId", id.PlayerID).Debug("Player Disconnected")

s.gsUpdateMutex.Lock()
defer s.gsUpdateMutex.Unlock()

found := -1
for i, playerID := range s.gsConnectedPlayers {
if playerID == id.PlayerID {
found = i
aLekSer marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
if found == -1 {
return &alpha.Bool{Bool: false}, nil
}

// let's retain the original order, as it should be a smaller patch on data change
s.gsConnectedPlayers = append(s.gsConnectedPlayers[:found], s.gsConnectedPlayers[found+1:]...)
aLekSer marked this conversation as resolved.
Show resolved Hide resolved
s.workerqueue.EnqueueAfter(cache.ExplicitKey(string(updateConnectedPlayers)), playerCountUpdatePeriod)

return &alpha.Bool{Bool: true}, nil
}

// IsPlayerConnected returns if the player ID is connected or not
// IsPlayerConnected returns if the playerID is currently connected to the GameServer.
// This is always accurate, even if the value hasn’t been updated to the GameServer status yet.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) IsPlayerConnected(ctx context.Context, id *alpha.PlayerID) (*alpha.Bool, error) {
panic("implement me")
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.RLock()
defer s.gsUpdateMutex.RUnlock()

result := &alpha.Bool{Bool: false}

for _, playerID := range s.gsConnectedPlayers {
if playerID == id.PlayerID {
result.Bool = true
break
}
}

return result, nil
}

// GetConnectedPlayers returns if the players are connected or not
// GetConnectedPlayers returns the list of the currently connected player ids.
// This is always accurate, even if the value hasn’t been updated to the GameServer status yet.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) GetConnectedPlayers(ctx context.Context, empty *alpha.Empty) (*alpha.PlayerIDList, error) {
panic("implement me")
func (s *SDKServer) GetConnectedPlayers(c context.Context, empty *alpha.Empty) (*alpha.PlayerIDList, error) {
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.RLock()
defer s.gsUpdateMutex.RUnlock()

return &alpha.PlayerIDList{List: s.gsConnectedPlayers}, nil
}

// GetPlayerCount returns the current player count.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) GetPlayerCount(ctx context.Context, _ *alpha.Empty) (*alpha.Count, error) {
panic("implement me")
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.RLock()
defer s.gsUpdateMutex.RUnlock()
return &alpha.Count{Count: int64(len(s.gsConnectedPlayers))}, nil
}

// SetPlayerCapacity to change the game server's player capacity.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) SetPlayerCapacity(ctx context.Context, count *alpha.Count) (*alpha.Empty, error) {
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.New(string(runtime.FeaturePlayerTracking) + " not enabled")
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.Lock()
s.gsPlayerCapacity = count.Count
Expand All @@ -598,7 +680,7 @@ func (s *SDKServer) SetPlayerCapacity(ctx context.Context, count *alpha.Count) (
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) GetPlayerCapacity(ctx context.Context, _ *alpha.Empty) (*alpha.Count, error) {
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.New(string(runtime.FeaturePlayerTracking) + " not enabled")
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.RLock()
defer s.gsUpdateMutex.RUnlock()
Expand Down Expand Up @@ -694,3 +776,36 @@ func (s *SDKServer) updatePlayerCapacity() error {
s.recorder.Event(gs, corev1.EventTypeNormal, "PlayerCapacity", fmt.Sprintf("Set to %d", gs.Status.Players.Capacity))
return nil
}

// updateConnectedPlayers updates the Player IDs and Count fields in the GameServer's Status.
func (s *SDKServer) updateConnectedPlayers() error {
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return errors.New(string(runtime.FeaturePlayerTracking) + " not enabled")
}
gs, err := s.gameServer()
if err != nil {
return err
}

gsCopy := gs.DeepCopy()
same := false
s.gsUpdateMutex.RLock()
s.logger.WithField("playerIDs", s.gsConnectedPlayers).Debug("updating connected players")
same = apiequality.Semantic.DeepEqual(gsCopy.Status.Players.IDs, s.gsConnectedPlayers)
gsCopy.Status.Players.IDs = s.gsConnectedPlayers
gsCopy.Status.Players.Count = int64(len(s.gsConnectedPlayers))
s.gsUpdateMutex.RUnlock()
// if there is no change, then don't update
// since it's possible this could fire quite a lot, let's reduce the
// amount of requests as much as possible.
if same {
return nil
}

gs, err = s.gameServerGetter.GameServers(s.namespace).Update(gsCopy)
if err != nil {
return err
}
s.recorder.Event(gs, corev1.EventTypeNormal, "PlayerCount", fmt.Sprintf("Set to %d", gs.Status.Players.Count))
return nil
}