Skip to content

Commit

Permalink
adding timeout to remote cluster allocate call and adding total timeo…
Browse files Browse the repository at this point in the history
…ut to allocate (googleforgames#1815)

* adding timeout to allocate call and adding total timeout to allocate

* adding explicit context check to retry

* added part of plumbing for timeout flags

* allocation

* generated install.yaml

* retry test

* Added test

* cleanup of fake controller

* test improvement

* Added docs

* linting

* lint

* fixing yamls

* fixes

* testcase for illustration

* custom error used to exit Retry

* comment

* test

* fixes

* fixes

* fixes

* fixes

* potential e2e test
  • Loading branch information
Dmytro Kislov authored and ilkercelikyilmaz committed Oct 23, 2020
1 parent 2bbd05b commit 722efcb
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 51 deletions.
8 changes: 5 additions & 3 deletions cmd/allocator/main.go
Expand Up @@ -89,7 +89,7 @@ func main() {
return err
})

h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled)
h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout)

listener, err := net.Listen("tcp", fmt.Sprintf(":%s", sslPort))
if err != nil {
Expand Down Expand Up @@ -182,7 +182,7 @@ func main() {
logger.WithError(err).Fatal("allocation service crashed")
}

func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool) *serviceHandler {
func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration) *serviceHandler {
defaultResync := 30 * time.Second
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
Expand All @@ -192,7 +192,9 @@ func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.I
agonesInformerFactory.Multicluster().V1().GameServerAllocationPolicies(),
kubeInformerFactory.Core().V1().Secrets(),
kubeClient,
gameserverallocations.NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), gsCounter, health))
gameserverallocations.NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), gsCounter, health),
remoteAllocationTimeout,
totalRemoteAllocationTimeout)

stop := signals.NewStopChannel()
h := serviceHandler{
Expand Down
47 changes: 29 additions & 18 deletions cmd/allocator/metrics.go
Expand Up @@ -16,6 +16,7 @@ package main
import (
"net/http"
"strings"
"time"

"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/runtime"
Expand All @@ -28,25 +29,29 @@ import (
)

const (
enableStackdriverMetricsFlag = "stackdriver-exporter"
enablePrometheusMetricsFlag = "prometheus-exporter"
projectIDFlag = "gcp-project-id"
stackdriverLabels = "stackdriver-labels"
mTLSDisabledFlag = "disable-mtls"
tlsDisabledFlag = "disable-tls"
enableStackdriverMetricsFlag = "stackdriver-exporter"
enablePrometheusMetricsFlag = "prometheus-exporter"
projectIDFlag = "gcp-project-id"
stackdriverLabels = "stackdriver-labels"
mTLSDisabledFlag = "disable-mtls"
tlsDisabledFlag = "disable-tls"
remoteAllocationTimeoutFlag = "remote-allocation-timeout"
totalRemoteAllocationTimeoutFlag = "total-remote-allocation-timeout"
)

func init() {
registerMetricViews()
}

type config struct {
TLSDisabled bool
MTLSDisabled bool
PrometheusMetrics bool
Stackdriver bool
GCPProjectID string
StackdriverLabels string
TLSDisabled bool
MTLSDisabled bool
PrometheusMetrics bool
Stackdriver bool
GCPProjectID string
StackdriverLabels string
totalRemoteAllocationTimeout time.Duration
remoteAllocationTimeout time.Duration
}

func parseEnvFlags() config {
Expand All @@ -57,13 +62,17 @@ func parseEnvFlags() config {
viper.SetDefault(stackdriverLabels, "")
viper.SetDefault(mTLSDisabledFlag, false)
viper.SetDefault(tlsDisabledFlag, false)
viper.SetDefault(remoteAllocationTimeoutFlag, 10*time.Second)
viper.SetDefault(totalRemoteAllocationTimeoutFlag, 30*time.Second)

pflag.Bool(enablePrometheusMetricsFlag, viper.GetBool(enablePrometheusMetricsFlag), "Flag to activate metrics of Agones. Can also use PROMETHEUS_EXPORTER env variable.")
pflag.Bool(enableStackdriverMetricsFlag, viper.GetBool(enableStackdriverMetricsFlag), "Flag to activate stackdriver monitoring metrics for Agones. Can also use STACKDRIVER_EXPORTER env variable.")
pflag.String(projectIDFlag, viper.GetString(projectIDFlag), "GCP ProjectID used for Stackdriver, if not specified ProjectID from Application Default Credentials would be used. Can also use GCP_PROJECT_ID env variable.")
pflag.String(stackdriverLabels, viper.GetString(stackdriverLabels), "A set of default labels to add to all stackdriver metrics generated. By default metadata are automatically added using Kubernetes API and GCP metadata enpoint.")
pflag.Bool(mTLSDisabledFlag, viper.GetBool(mTLSDisabledFlag), "Flag to enable/disable mTLS in the allocator.")
pflag.Bool(tlsDisabledFlag, viper.GetBool(tlsDisabledFlag), "Flag to enable/disable TLS in the allocator.")
pflag.Duration(remoteAllocationTimeoutFlag, viper.GetDuration(remoteAllocationTimeoutFlag), "Flag to set remote allocation call timeout.")
pflag.Duration(totalRemoteAllocationTimeoutFlag, viper.GetDuration(totalRemoteAllocationTimeoutFlag), "Flag to set total remote allocation timeout including retries.")
runtime.FeaturesBindFlags()
pflag.Parse()

Expand All @@ -80,12 +89,14 @@ func parseEnvFlags() config {
runtime.Must(runtime.ParseFeaturesFromEnv())

return config{
PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag),
Stackdriver: viper.GetBool(enableStackdriverMetricsFlag),
GCPProjectID: viper.GetString(projectIDFlag),
StackdriverLabels: viper.GetString(stackdriverLabels),
MTLSDisabled: viper.GetBool(mTLSDisabledFlag),
TLSDisabled: viper.GetBool(tlsDisabledFlag),
PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag),
Stackdriver: viper.GetBool(enableStackdriverMetricsFlag),
GCPProjectID: viper.GetString(projectIDFlag),
StackdriverLabels: viper.GetString(stackdriverLabels),
MTLSDisabled: viper.GetBool(mTLSDisabledFlag),
TLSDisabled: viper.GetBool(tlsDisabledFlag),
remoteAllocationTimeout: viper.GetDuration(remoteAllocationTimeoutFlag),
totalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag),
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Expand Up @@ -209,7 +209,7 @@ func main() {
gsSetController := gameserversets.NewController(wh, health, gsCounter,
kubeClient, extClient, agonesClient, agonesInformerFactory)
fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory)
gasController := gameserverallocations.NewController(api, health, gsCounter, kubeClient, kubeInformerFactory, agonesClient, agonesInformerFactory)
gasController := gameserverallocations.NewController(api, health, gsCounter, kubeClient, kubeInformerFactory, agonesClient, agonesInformerFactory, 10*time.Second, 30*time.Second)
fasController := fleetautoscalers.NewController(wh, health,
kubeClient, extClient, agonesClient, agonesInformerFactory)

Expand Down
4 changes: 4 additions & 0 deletions install/helm/agones/templates/service/allocation.yaml
Expand Up @@ -132,6 +132,10 @@ spec:
value: {{ .Values.agones.allocator.disableMTLS | quote }}
- name: DISABLE_TLS
value: {{ .Values.agones.allocator.disableTLS | quote }}
- name: REMOTE_ALLOCATION_TIMEOUT
value: {{ .Values.agones.allocator.remoteAllocationTimeout | quote }}
- name: TOTAL_REMOTE_ALLOCATION_TIMEOUT
value: {{ .Values.agones.allocator.totalRemoteAllocationTimeout | quote }}
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
2 changes: 2 additions & 0 deletions install/helm/agones/values.yaml
Expand Up @@ -135,6 +135,8 @@ agones:
generateClientTLS: true
disableMTLS: false
disableTLS: false
remoteAllocationTimeout: 10s
totalRemoteAllocationTimeout: 30s
image:
registry: gcr.io/agones-images
tag: 1.10.0-dev
Expand Down
4 changes: 4 additions & 0 deletions install/yaml/install.yaml
Expand Up @@ -1581,6 +1581,10 @@ spec:
value: "false"
- name: DISABLE_TLS
value: "false"
- name: REMOTE_ALLOCATION_TIMEOUT
value: "10s"
- name: TOTAL_REMOTE_ALLOCATION_TIMEOUT
value: "30s"
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
61 changes: 39 additions & 22 deletions pkg/gameserverallocations/allocator.go
Expand Up @@ -62,6 +62,8 @@ var (
ErrNoGameServerReady = errors.New("Could not find a Ready GameServer")
// ErrConflictInGameServerSelection is returned when the candidate gameserver already allocated
ErrConflictInGameServerSelection = errors.New("The Gameserver was already allocated")
// ErrTotalTimeoutExceeded is used to signal that total retry timeout has been exceeded and no additional retries should be made
ErrTotalTimeoutExceeded = status.Errorf(codes.DeadlineExceeded, "remote allocation total timeout exceeded")
)

const (
Expand Down Expand Up @@ -97,16 +99,18 @@ var remoteAllocationRetry = wait.Backoff{

// Allocator handles game server allocation
type Allocator struct {
baseLogger *logrus.Entry
allocationPolicyLister multiclusterlisterv1.GameServerAllocationPolicyLister
allocationPolicySynced cache.InformerSynced
secretLister corev1lister.SecretLister
secretSynced cache.InformerSynced
recorder record.EventRecorder
pendingRequests chan request
readyGameServerCache *ReadyGameServerCache
topNGameServerCount int
remoteAllocationCallback func(string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
baseLogger *logrus.Entry
allocationPolicyLister multiclusterlisterv1.GameServerAllocationPolicyLister
allocationPolicySynced cache.InformerSynced
secretLister corev1lister.SecretLister
secretSynced cache.InformerSynced
recorder record.EventRecorder
pendingRequests chan request
readyGameServerCache *ReadyGameServerCache
topNGameServerCount int
remoteAllocationCallback func(context.Context, string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
remoteAllocationTimeout time.Duration
totalRemoteAllocationTimeout time.Duration
}

// request is an async request for allocation
Expand All @@ -124,24 +128,28 @@ type response struct {

// NewAllocator creates an instance of Allocator
func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPolicyInformer, secretInformer informercorev1.SecretInformer,
kubeClient kubernetes.Interface, readyGameServerCache *ReadyGameServerCache) *Allocator {
kubeClient kubernetes.Interface, readyGameServerCache *ReadyGameServerCache, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration) *Allocator {
ah := &Allocator{
pendingRequests: make(chan request, maxBatchQueue),
allocationPolicyLister: policyInformer.Lister(),
allocationPolicySynced: policyInformer.Informer().HasSynced,
secretLister: secretInformer.Lister(),
secretSynced: secretInformer.Informer().HasSynced,
readyGameServerCache: readyGameServerCache,
topNGameServerCount: topNGameServerDefaultCount,
remoteAllocationCallback: func(endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
pendingRequests: make(chan request, maxBatchQueue),
allocationPolicyLister: policyInformer.Lister(),
allocationPolicySynced: policyInformer.Informer().HasSynced,
secretLister: secretInformer.Lister(),
secretSynced: secretInformer.Informer().HasSynced,
readyGameServerCache: readyGameServerCache,
topNGameServerCount: topNGameServerDefaultCount,
remoteAllocationTimeout: remoteAllocationTimeout,
totalRemoteAllocationTimeout: totalRemoteAllocationTimeout,
remoteAllocationCallback: func(ctx context.Context, endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
conn, err := grpc.Dial(endpoint, dialOpts)
if err != nil {
return nil, err
}
defer conn.Close() // nolint: errcheck

allocationCtx, cancel := context.WithTimeout(ctx, remoteAllocationTimeout)
defer cancel() // nolint: errcheck
grpcClient := pb.NewAllocationServiceClient(conn)
return grpcClient.Allocate(context.Background(), request)
return grpcClient.Allocate(allocationCtx, request)
},
}

Expand Down Expand Up @@ -336,13 +344,19 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca
request.MultiClusterSetting.Enabled = false
request.Namespace = connectionInfo.Namespace

ctx, cancel := context.WithTimeout(context.Background(), c.totalRemoteAllocationTimeout)
defer cancel() // nolint: errcheck
// Retry on remote call failures.
err = Retry(remoteAllocationRetry, func() error {
for i, ip := range connectionInfo.AllocationEndpoints {
select {
case <-ctx.Done():
return ErrTotalTimeoutExceeded
default:
}
endpoint := addPort(ip)
c.loggerForGameServerAllocationKey("remote-allocation").WithField("request", request).WithField("endpoint", endpoint).Debug("forwarding allocation request")

allocationResponse, err = c.remoteAllocationCallback(endpoint, dialOpts, request)
allocationResponse, err = c.remoteAllocationCallback(ctx, endpoint, dialOpts, request)
if err != nil {
c.baseLogger.Errorf("remote allocation failed with: %v", err)
// If there are multiple enpoints for the allocator connection and the current one is
Expand All @@ -356,6 +370,7 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca
}
break
}

return nil
})

Expand Down Expand Up @@ -565,6 +580,8 @@ func Retry(backoff wait.Backoff, fn func() error) error {
return true, nil
case err == ErrNoGameServerReady:
return true, err
case err == ErrTotalTimeoutExceeded:
return true, err
default:
lastConflictErr = err
return false, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/gameserverallocations/controller.go
Expand Up @@ -59,14 +59,18 @@ func NewController(apiServer *apiserver.APIServer,
kubeInformerFactory informers.SharedInformerFactory,
agonesClient versioned.Interface,
agonesInformerFactory externalversions.SharedInformerFactory,
remoteAllocationTimeout time.Duration,
totalAllocationTimeout time.Duration,
) *Controller {
c := &Controller{
api: apiServer,
allocator: NewAllocator(
agonesInformerFactory.Multicluster().V1().GameServerAllocationPolicies(),
kubeInformerFactory.Core().V1().Secrets(),
kubeClient,
NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), counter, health)),
NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), counter, health),
remoteAllocationTimeout,
totalAllocationTimeout),
}
c.baseLogger = runtime.NewLoggerWithType(c)

Expand Down

0 comments on commit 722efcb

Please sign in to comment.