Skip to content

Commit

Permalink
Implementing multi-cluster allocation policy selector.
Browse files Browse the repository at this point in the history
  • Loading branch information
pooneh-m authored and markmandel committed Apr 25, 2019
1 parent 83a3e04 commit fd86d86
Show file tree
Hide file tree
Showing 2 changed files with 376 additions and 0 deletions.
124 changes: 124 additions & 0 deletions pkg/apis/multicluster/v1alpha1/gameserverallocationpolicy.go
Expand Up @@ -15,6 +15,9 @@
package v1alpha1

import (
"math/rand"
"sort"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -54,3 +57,124 @@ type GameServerAllocationPolicyList struct {
metav1.ListMeta `json:"metadata,omitempty"`
Items []GameServerAllocationPolicy `json:"items"`
}

// clusterToPolicy map type definition for cluster to policy map
type clusterToPolicy map[string][]*GameServerAllocationPolicy

// ConnectionInfoIterator an iterator on ClusterConnectionInfo
type ConnectionInfoIterator struct {
// currPriority Current priority index from the orderedPriorities
currPriority int
// orderedPriorities list of ordered priorities
orderedPriorities []int
// priorityToCluster Map of priority to cluster-policies map
priorityToCluster map[int]clusterToPolicy
// clusterBlackList the cluster blacklist for the clusters that has already returned
clusterBlackList map[string]bool
}

// Next returns the next ClusterConnectionInfo value if available or nil if iterator reaches the end.
func (it *ConnectionInfoIterator) Next() *ClusterConnectionInfo {
for it.currPriority < len(it.orderedPriorities) {
// Get clusters with the highest priority
currPriority := it.orderedPriorities[it.currPriority]
clusterPolicy := it.priorityToCluster[currPriority]

if result := it.getClusterConnectionInfo(&clusterPolicy); result == nil {
// If there is no cluster with the current priority, choose cluster with next highest priority
it.currPriority++
} else {
// To avoid the same cluster again add that to a black list
it.clusterBlackList[result.ClusterName] = true
return result
}
}

return nil
}

// NewConnectionInfoIterator creates an iterator for connection info
func NewConnectionInfoIterator(policies []*GameServerAllocationPolicy) *ConnectionInfoIterator {
priorityToCluster := make(map[int]clusterToPolicy)
for _, policy := range policies {
priority := policy.Spec.Priority
clusterName := policy.Spec.ConnectionInfo.ClusterName

// 1. Add priorities to the map of priority to cluster-priorities map
clusterPolicy, ok := priorityToCluster[priority]
if !ok {
clusterPolicy = make(clusterToPolicy)
priorityToCluster[priority] = clusterPolicy
}

// 2. Add cluster to the cluster-priorities map
if _, ok := clusterPolicy[clusterName]; !ok {
clusterPolicy[clusterName] = []*GameServerAllocationPolicy{policy}
} else {
clusterPolicy[clusterName] = append(clusterPolicy[clusterName], policy)
}
}

// 3. Sort priorities
priorities := make([]int, 0, len(priorityToCluster))
for k := range priorityToCluster {
priorities = append(priorities, k)
}
sort.Slice(priorities, func(i, j int) bool { return priorities[i] < priorities[j] })

// 4. Store initial values for the iterator
return &ConnectionInfoIterator{priorityToCluster: priorityToCluster, currPriority: 0, orderedPriorities: priorities, clusterBlackList: make(map[string]bool)}
}

// getClusterConnectionInfo returns a ClusterConnectionInfo selected base on weighted randomization.
func (it *ConnectionInfoIterator) getClusterConnectionInfo(clusterPolicy *clusterToPolicy) *ClusterConnectionInfo {
connections := []*ClusterConnectionInfo{}
weights := []int{}
for cluster, policies := range *clusterPolicy {
if _, ok := it.clusterBlackList[cluster]; ok {
continue
}
weights = append(weights, avgWeight(policies))
connections = append(connections, &policies[0].Spec.ConnectionInfo)
}

if len(connections) == 0 {
return nil
}

return selectRandomWeighted(connections, &weights)
}

// avgWeight calculates average over allocation policy Weight field.
func avgWeight(policies []*GameServerAllocationPolicy) int {
if len(policies) == 0 {
return 0
}
var sum int
for _, policy := range policies {
sum += policy.Spec.Weight
}
return sum / len(policies)
}

// selectRandomWeighted selects a ClusterConnectionInfo info from a weighted list of ClusterConnectionInfo
func selectRandomWeighted(connections []*ClusterConnectionInfo, weights *[]int) *ClusterConnectionInfo {
sum := 0
for _, weight := range *weights {
sum += weight
}

if sum <= 0 {
return nil
}

rand := rand.Intn(sum)
sum = 0
for i, weight := range *weights {
sum += weight
if rand < sum {
return connections[i]
}
}
return nil
}
252 changes: 252 additions & 0 deletions pkg/apis/multicluster/v1alpha1/gameserverallocationpolicy_test.go
@@ -0,0 +1,252 @@
// Copyright 2019 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConnectionInfoIterator(t *testing.T) {
testCases := []struct {
name string
in []*GameServerAllocationPolicy
want []ClusterConnectionInfo
unordered bool
}{
{
name: "Simple test",
in: []*GameServerAllocationPolicy{
{
Spec: GameServerAllocationPolicySpec{
Priority: 1,
Weight: 100,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster1",
SecretName: "secret-name",
APIServerEndpoint: "api-server-endpoint",
},
},
},
},
want: []ClusterConnectionInfo{
{
ClusterName: "cluster1",
SecretName: "secret-name",
APIServerEndpoint: "api-server-endpoint",
},
},
},
{
name: "Different priorities and weight same cluster",
in: []*GameServerAllocationPolicy{
{
Spec: GameServerAllocationPolicySpec{
Priority: 1,
Weight: 100,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster-name",
},
},
},
{
Spec: GameServerAllocationPolicySpec{
Priority: 2,
Weight: 300,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster-name",
},
},
},
},
want: []ClusterConnectionInfo{
{
ClusterName: "cluster-name",
},
},
},
{
name: "Different clusters same priority",
in: []*GameServerAllocationPolicy{
{
Spec: GameServerAllocationPolicySpec{
Priority: 1,
Weight: 300,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster1",
},
},
},
{
Spec: GameServerAllocationPolicySpec{
Priority: 1,
Weight: 100,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster2",
},
},
},
},
want: []ClusterConnectionInfo{
{
ClusterName: "cluster1",
},
{
ClusterName: "cluster2",
},
},
unordered: true,
},
{
name: "Different clusters different priorities",
in: []*GameServerAllocationPolicy{
{
Spec: GameServerAllocationPolicySpec{
Priority: 1,
Weight: 300,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster1",
},
},
},
{
Spec: GameServerAllocationPolicySpec{
Priority: 2,
Weight: 100,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster2",
},
},
},
},
want: []ClusterConnectionInfo{
{
ClusterName: "cluster1",
},
{
ClusterName: "cluster2",
},
},
},
{
name: "Different clusters repeated with different priorities",
in: []*GameServerAllocationPolicy{
{
Spec: GameServerAllocationPolicySpec{
Priority: 1,
Weight: 300,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster1",
},
},
},
{
Spec: GameServerAllocationPolicySpec{
Priority: 1,
Weight: 100,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster2",
},
},
},
{
Spec: GameServerAllocationPolicySpec{
Priority: 2,
Weight: 300,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster1",
},
},
},
{
Spec: GameServerAllocationPolicySpec{
Priority: 2,
Weight: 100,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster2",
},
},
},
},
want: []ClusterConnectionInfo{
{
ClusterName: "cluster1",
},
{
ClusterName: "cluster2",
},
},
unordered: true,
},
{
name: "Zero weight never chosen",
in: []*GameServerAllocationPolicy{
{
Spec: GameServerAllocationPolicySpec{
Priority: 1,
Weight: 0,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster1",
},
},
},
{
Spec: GameServerAllocationPolicySpec{
Priority: 1,
Weight: 100,
ConnectionInfo: ClusterConnectionInfo{
ClusterName: "cluster2",
},
},
},
},
want: []ClusterConnectionInfo{
{
ClusterName: "cluster2",
},
},
},
{
name: "Empty policy list",
in: []*GameServerAllocationPolicy{},
want: nil,
},
{
name: "Nil policy list",
in: nil,
want: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var results []ClusterConnectionInfo
iterator := NewConnectionInfoIterator(tc.in)
for {
connectionInfo := iterator.Next()
if connectionInfo == nil {
break
}
results = append(results, *connectionInfo)
}

if tc.unordered {
assert.ElementsMatch(t, tc.want, results, "Failed test \"%s\"", tc.name)
} else {
assert.Equal(t, tc.want, results, "Failed test \"%s\"", tc.name)
}
})
}
}

0 comments on commit fd86d86

Please sign in to comment.