Skip to content

Commit

Permalink
Add gRPC dial timeout to game server allocation policy (#1700)
Browse files Browse the repository at this point in the history
When using the multicluster allocator dialling remote clusters can cause
allocation requests to block for 2 minutes +

In order to control dial timeout of allocation requests Timeout, BackoffCap properties
were added to game server allocation policy
  • Loading branch information
yeukovichd committed Sep 24, 2020
1 parent 5709810 commit 990cb13
Show file tree
Hide file tree
Showing 8 changed files with 2,316 additions and 2,196 deletions.
11 changes: 11 additions & 0 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,19 @@ type serviceHandler struct {
// Allocate implements the Allocate gRPC method definition
func (h *serviceHandler) Allocate(ctx context.Context, in *pb.AllocationRequest) (*pb.AllocationResponse, error) {
logger.WithField("request", in).Infof("allocation request received.")

switch ctx.Err() {
case context.Canceled:
logger.Info("allocation request canceled by client, abandoning")
return nil, status.Error(codes.Canceled, "allocation request canceled by client, abandoning")
case context.DeadlineExceeded:
logger.Info("allocation request deadline exceedeed, abandoning")
return nil, status.Error(codes.DeadlineExceeded, "allocation request deadline exceeded, abandoning")
}

gsa := converters.ConvertAllocationRequestToGSA(in)
resultObj, err := h.allocationCallback(gsa)

if err != nil {
logger.WithField("gsa", gsa).WithError(err).Info("allocation failed")
return nil, err
Expand Down
47 changes: 47 additions & 0 deletions cmd/allocator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"os"
"testing"
"time"

pb "agones.dev/agones/pkg/allocation/go"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
Expand Down Expand Up @@ -106,6 +107,52 @@ func TestGetTlsCert(t *testing.T) {
assert.Equal(t, cert2.Certificate, retrievedCert2.Certificate, "expected the retrieved cert to be equal to the original one")
}

func TestContextDeadlineExceeded(t *testing.T) {
t.Parallel()

handler := serviceHandler{}
request := pb.AllocationRequest{}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Nanosecond))
defer cancel()
_, err := handler.Allocate(ctx, &request)

if !assert.Error(t, err, "expecting failure") {
return
}

st, ok := status.FromError(err)

if !ok {
t.Errorf("expecting status error: %v", err)
}

assert.Equal(t, codes.DeadlineExceeded, st.Code())
assert.Contains(t, st.Message(), "allocation request deadline exceeded, abandoning")
}

func TestContextCancelation(t *testing.T) {
t.Parallel()

handler := serviceHandler{}
request := pb.AllocationRequest{}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Minute))
cancel()
_, err := handler.Allocate(ctx, &request)

if !assert.Error(t, err, "expecting failure") {
return
}

st, ok := status.FromError(err)

if !ok {
t.Errorf("expecting status error: %v", err)
}

assert.Equal(t, codes.Canceled, st.Code())
assert.Contains(t, st.Message(), "allocation request canceled by client, abandoning")
}

func TestHandlingStatus(t *testing.T) {
t.Parallel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ spec:
minItems: 1
clusterName:
type: string
timeout:
format: int64
minimum: 0
type: integer
backoffCap:
format: int64
minimum: 0
type: integer
namespace:
type: string
required:
Expand Down
8 changes: 8 additions & 0 deletions install/yaml/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,14 @@ spec:
minItems: 1
clusterName:
type: string
timeout:
format: int64
minimum: 0
type: integer
backoffCap:
format: int64
minimum: 0
type: integer
namespace:
type: string
required:
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/multicluster/v1/gameserverallocationpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type ClusterConnectionInfo struct {
Namespace string `json:"namespace"`
// The PEM encoded server CA, used by the allocator client to authenticate the remote server.
ServerCA []byte `json:"serverCa,omitempty"`
// Optional: specifies how long (in milliseconds) gRPC client is willing to wait for the allocator service
// to complete request before it is terminated with the error DEADLINE_EXCEEDED
Timeout int64 `json:"timeout,omitempty"`
// Optional: the maximum duration (in milliseconds) applied to a backoff function
BackoffCap int64 `json:"backoffCap,omitempty"`
}

// +genclient
Expand Down
39 changes: 28 additions & 11 deletions pkg/gameserverallocations/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ var allocationRetry = wait.Backoff{
Jitter: 0.1,
}

var remoteAllocationRetry = wait.Backoff{
Steps: 7,
Duration: 100 * time.Millisecond,
Factor: 2.0,
}

// Allocator handles game server allocation
type Allocator struct {
baseLogger *logrus.Entry
Expand All @@ -106,7 +100,7 @@ type Allocator struct {
pendingRequests chan request
readyGameServerCache *ReadyGameServerCache
topNGameServerCount int
remoteAllocationCallback func(string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
remoteAllocationCallback func(context.Context, string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
}

// request is an async request for allocation
Expand All @@ -133,15 +127,15 @@ func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPoli
secretSynced: secretInformer.Informer().HasSynced,
readyGameServerCache: readyGameServerCache,
topNGameServerCount: topNGameServerDefaultCount,
remoteAllocationCallback: func(endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
remoteAllocationCallback: func(ctx context.Context, endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
conn, err := grpc.Dial(endpoint, dialOpts)
if err != nil {
return nil, err
}
defer conn.Close() // nolint: errcheck

grpcClient := pb.NewAllocationServiceClient(conn)
return grpcClient.Allocate(context.Background(), request)
return grpcClient.Allocate(ctx, request)
},
}

Expand Down Expand Up @@ -335,14 +329,26 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca
request := converters.ConvertGSAToAllocationRequest(gsa)
request.MultiClusterSetting.Enabled = false
request.Namespace = connectionInfo.Namespace
backoff := newRemoteAllocationBackoff(time.Duration(connectionInfo.BackoffCap) * time.Millisecond)

// Retry on remote call failures.
err = Retry(remoteAllocationRetry, func() error {
err = Retry(backoff, func() error {
for i, ip := range connectionInfo.AllocationEndpoints {
endpoint := addPort(ip)
c.loggerForGameServerAllocationKey("remote-allocation").WithField("request", request).WithField("endpoint", endpoint).Debug("forwarding allocation request")

allocationResponse, err = c.remoteAllocationCallback(endpoint, dialOpts, request)
ctx := context.Background()

if connectionInfo.Timeout > 0 {
deadline := time.Now().Add(time.Duration(connectionInfo.Timeout) * time.Millisecond)
ctxWithDeadline, cancel := context.WithDeadline(ctx, deadline)
defer cancel()

ctx = ctxWithDeadline
}

allocationResponse, err = c.remoteAllocationCallback(ctx, endpoint, dialOpts, request)

if err != nil {
c.baseLogger.Errorf("remote allocation failed with: %v", err)
// If there are multiple enpoints for the allocator connection and the current one is
Expand All @@ -362,6 +368,17 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca
return converters.ConvertAllocationResponseToGSA(allocationResponse), err
}

func newRemoteAllocationBackoff(backoffCap time.Duration) wait.Backoff {
backoff := wait.Backoff{
Steps: 7,
Duration: 100 * time.Millisecond,
Factor: 2.0,
Cap: backoffCap,
}

return backoff
}

// createRemoteClusterDialOption creates a grpc client dial option with proper certs to make a remote call.
func (c *Allocator) createRemoteClusterDialOption(namespace string, connectionInfo *multiclusterv1.ClusterConnectionInfo) (grpc.DialOption, error) {
// TODO: disableMTLS works for a single cluster; still need to address how the flag interacts with multi-cluster authentication.
Expand Down
7 changes: 4 additions & 3 deletions pkg/gameserverallocations/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gameserverallocations

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -968,7 +969,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) {
},
}

c.allocator.remoteAllocationCallback = func(e string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
c.allocator.remoteAllocationCallback = func(ctx context.Context, e string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
assert.Equal(t, endpoint+":443", e)
serverResponse := pb.AllocationResponse{
GameServerName: expectedGSName,
Expand All @@ -991,7 +992,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) {
retry := 0
endpoint := "z.z.z.z"

c.allocator.remoteAllocationCallback = func(endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
c.allocator.remoteAllocationCallback = func(ctx context.Context, endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
if count == 0 {
serverResponse := pb.AllocationResponse{}
count++
Expand Down Expand Up @@ -1087,7 +1088,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) {
healthyEndpoint := "healthy_endpoint:443"

expectedGSName := "mocked"
c.allocator.remoteAllocationCallback = func(endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
c.allocator.remoteAllocationCallback = func(ctx context.Context, endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
if endpoint == unhealthyEndpoint {
return nil, errors.New("test error message")
}
Expand Down

0 comments on commit 990cb13

Please sign in to comment.