Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding timeout to remote cluster allocate call and adding total timeout to allocate #1815

Merged
merged 29 commits into from Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/allocator/main.go
Expand Up @@ -89,7 +89,7 @@ func main() {
return err
})

h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled)
h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout)

listener, err := net.Listen("tcp", fmt.Sprintf(":%s", sslPort))
if err != nil {
Expand Down Expand Up @@ -182,7 +182,7 @@ func main() {
logger.WithError(err).Fatal("allocation service crashed")
}

func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool) *serviceHandler {
func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration) *serviceHandler {
defaultResync := 30 * time.Second
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
Expand All @@ -192,7 +192,9 @@ func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.I
agonesInformerFactory.Multicluster().V1().GameServerAllocationPolicies(),
kubeInformerFactory.Core().V1().Secrets(),
kubeClient,
gameserverallocations.NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), gsCounter, health))
gameserverallocations.NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), gsCounter, health),
remoteAllocationTimeout,
totalRemoteAllocationTimeout)

stop := signals.NewStopChannel()
h := serviceHandler{
Expand Down
45 changes: 27 additions & 18 deletions cmd/allocator/metrics.go
Expand Up @@ -16,6 +16,7 @@ package main
import (
"net/http"
"strings"
"time"

"agones.dev/agones/pkg/metrics"
"agones.dev/agones/pkg/util/runtime"
Expand All @@ -28,25 +29,29 @@ import (
)

const (
enableStackdriverMetricsFlag = "stackdriver-exporter"
enablePrometheusMetricsFlag = "prometheus-exporter"
projectIDFlag = "gcp-project-id"
stackdriverLabels = "stackdriver-labels"
mTLSDisabledFlag = "disable-mtls"
tlsDisabledFlag = "disable-tls"
enableStackdriverMetricsFlag = "stackdriver-exporter"
enablePrometheusMetricsFlag = "prometheus-exporter"
projectIDFlag = "gcp-project-id"
stackdriverLabels = "stackdriver-labels"
mTLSDisabledFlag = "disable-mtls"
tlsDisabledFlag = "disable-tls"
remoteAllocationTimeoutFlag = "remote-allocation-timeout"
totalRemoteAllocationTimeoutFlag = "total-remote-allocation-timeout"
)

func init() {
registerMetricViews()
}

type config struct {
TLSDisabled bool
MTLSDisabled bool
PrometheusMetrics bool
Stackdriver bool
GCPProjectID string
StackdriverLabels string
TLSDisabled bool
MTLSDisabled bool
PrometheusMetrics bool
Stackdriver bool
GCPProjectID string
StackdriverLabels string
totalRemoteAllocationTimeout time.Duration
remoteAllocationTimeout time.Duration
}

func parseEnvFlags() config {
Expand All @@ -64,6 +69,8 @@ func parseEnvFlags() config {
pflag.String(stackdriverLabels, viper.GetString(stackdriverLabels), "A set of default labels to add to all stackdriver metrics generated. By default metadata are automatically added using Kubernetes API and GCP metadata enpoint.")
pflag.Bool(mTLSDisabledFlag, viper.GetBool(mTLSDisabledFlag), "Flag to enable/disable mTLS in the allocator.")
pflag.Bool(tlsDisabledFlag, viper.GetBool(tlsDisabledFlag), "Flag to enable/disable TLS in the allocator.")
pflag.Duration(remoteAllocationTimeoutFlag, viper.GetDuration(remoteAllocationTimeoutFlag), "Flag to set remote allocation call timeout.")
pflag.Duration(totalRemoteAllocationTimeoutFlag, viper.GetDuration(totalRemoteAllocationTimeoutFlag), "Flag to set total remote allocation timeout including retries.")
runtime.FeaturesBindFlags()
pflag.Parse()

Expand All @@ -80,12 +87,14 @@ func parseEnvFlags() config {
runtime.Must(runtime.ParseFeaturesFromEnv())

return config{
PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag),
Stackdriver: viper.GetBool(enableStackdriverMetricsFlag),
GCPProjectID: viper.GetString(projectIDFlag),
StackdriverLabels: viper.GetString(stackdriverLabels),
MTLSDisabled: viper.GetBool(mTLSDisabledFlag),
TLSDisabled: viper.GetBool(tlsDisabledFlag),
PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag),
Stackdriver: viper.GetBool(enableStackdriverMetricsFlag),
GCPProjectID: viper.GetString(projectIDFlag),
StackdriverLabels: viper.GetString(stackdriverLabels),
MTLSDisabled: viper.GetBool(mTLSDisabledFlag),
TLSDisabled: viper.GetBool(tlsDisabledFlag),
remoteAllocationTimeout: viper.GetDuration(remoteAllocationTimeoutFlag),
totalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag),
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Expand Up @@ -209,7 +209,7 @@ func main() {
gsSetController := gameserversets.NewController(wh, health, gsCounter,
kubeClient, extClient, agonesClient, agonesInformerFactory)
fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory)
gasController := gameserverallocations.NewController(api, health, gsCounter, kubeClient, kubeInformerFactory, agonesClient, agonesInformerFactory)
gasController := gameserverallocations.NewController(api, health, gsCounter, kubeClient, kubeInformerFactory, agonesClient, agonesInformerFactory, 10*time.Second, 30*time.Second)
fasController := fleetautoscalers.NewController(wh, health,
kubeClient, extClient, agonesClient, agonesInformerFactory)

Expand Down
4 changes: 4 additions & 0 deletions install/helm/agones/templates/service/allocation.yaml
Expand Up @@ -132,6 +132,10 @@ spec:
value: {{ .Values.agones.allocator.disableMTLS | quote }}
- name: DISABLE_TLS
value: {{ .Values.agones.allocator.disableTLS | quote }}
- name: REMOTE_ALLOCATION_TIMEOUT
value: {{ .Values.agones.allocator.remoteAllocationTimeout | quote }}
- name: TOTAL_REMOTE_ALLOCATION_TIMEOUT
value: {{ .Values.agones.allocator.totalRemoteAllocationTimeout | quote }}
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
2 changes: 2 additions & 0 deletions install/helm/agones/values.yaml
Expand Up @@ -135,6 +135,8 @@ agones:
generateClientTLS: true
disableMTLS: false
disableTLS: false
remoteAllocationTimeout: 10s
totalRemoteAllocationTimeout: 30s
image:
registry: gcr.io/agones-images
tag: 1.10.0-dev
Expand Down
4 changes: 4 additions & 0 deletions install/yaml/install.yaml
Expand Up @@ -1581,6 +1581,10 @@ spec:
value: "false"
- name: DISABLE_TLS
value: "false"
- name: REMOTE_ALLOCATION_TIMEOUT
value: "10s"
- name: TOTAL_REMOTE_ALLOCATION_TIMEOUT
value: "30s"
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
61 changes: 39 additions & 22 deletions pkg/gameserverallocations/allocator.go
Expand Up @@ -62,6 +62,8 @@ var (
ErrNoGameServerReady = errors.New("Could not find a Ready GameServer")
// ErrConflictInGameServerSelection is returned when the candidate gameserver already allocated
ErrConflictInGameServerSelection = errors.New("The Gameserver was already allocated")
// ErrTotalTimeoutExceeded is used to signal that total retry timeout has been exceeded and no additional retries should be made
ErrTotalTimeoutExceeded = status.Errorf(codes.DeadlineExceeded, "remote allocation retry timeout exceeded")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/retry/total

)

const (
Expand Down Expand Up @@ -97,16 +99,18 @@ var remoteAllocationRetry = wait.Backoff{

// Allocator handles game server allocation
type Allocator struct {
baseLogger *logrus.Entry
allocationPolicyLister multiclusterlisterv1.GameServerAllocationPolicyLister
allocationPolicySynced cache.InformerSynced
secretLister corev1lister.SecretLister
secretSynced cache.InformerSynced
recorder record.EventRecorder
pendingRequests chan request
readyGameServerCache *ReadyGameServerCache
topNGameServerCount int
remoteAllocationCallback func(string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
baseLogger *logrus.Entry
allocationPolicyLister multiclusterlisterv1.GameServerAllocationPolicyLister
allocationPolicySynced cache.InformerSynced
secretLister corev1lister.SecretLister
secretSynced cache.InformerSynced
recorder record.EventRecorder
pendingRequests chan request
readyGameServerCache *ReadyGameServerCache
topNGameServerCount int
remoteAllocationCallback func(context.Context, string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
remoteAllocationTimeout time.Duration
totalRemoteAllocationTimeout time.Duration
}

// request is an async request for allocation
Expand All @@ -124,24 +128,28 @@ type response struct {

// NewAllocator creates an instance of Allocator
func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPolicyInformer, secretInformer informercorev1.SecretInformer,
kubeClient kubernetes.Interface, readyGameServerCache *ReadyGameServerCache) *Allocator {
kubeClient kubernetes.Interface, readyGameServerCache *ReadyGameServerCache, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration) *Allocator {
ah := &Allocator{
pendingRequests: make(chan request, maxBatchQueue),
allocationPolicyLister: policyInformer.Lister(),
allocationPolicySynced: policyInformer.Informer().HasSynced,
secretLister: secretInformer.Lister(),
secretSynced: secretInformer.Informer().HasSynced,
readyGameServerCache: readyGameServerCache,
topNGameServerCount: topNGameServerDefaultCount,
remoteAllocationCallback: func(endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
pendingRequests: make(chan request, maxBatchQueue),
allocationPolicyLister: policyInformer.Lister(),
allocationPolicySynced: policyInformer.Informer().HasSynced,
secretLister: secretInformer.Lister(),
secretSynced: secretInformer.Informer().HasSynced,
readyGameServerCache: readyGameServerCache,
topNGameServerCount: topNGameServerDefaultCount,
remoteAllocationTimeout: remoteAllocationTimeout,
totalRemoteAllocationTimeout: totalRemoteAllocationTimeout,
remoteAllocationCallback: func(ctx context.Context, endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
conn, err := grpc.Dial(endpoint, dialOpts)
if err != nil {
return nil, err
}
defer conn.Close() // nolint: errcheck

allocationCtx, cancel := context.WithTimeout(ctx, remoteAllocationTimeout)
defer cancel() // nolint: errcheck
grpcClient := pb.NewAllocationServiceClient(conn)
return grpcClient.Allocate(context.Background(), request)
return grpcClient.Allocate(allocationCtx, request)
},
}

Expand Down Expand Up @@ -336,13 +344,19 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca
request.MultiClusterSetting.Enabled = false
request.Namespace = connectionInfo.Namespace

ctx, cancel := context.WithTimeout(context.Background(), c.totalRemoteAllocationTimeout)
defer cancel() // nolint: errcheck
// Retry on remote call failures.
err = Retry(remoteAllocationRetry, func() error {
for i, ip := range connectionInfo.AllocationEndpoints {
select {
pooneh-m marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
return ErrTotalTimeoutExceeded
default:
}
endpoint := addPort(ip)
c.loggerForGameServerAllocationKey("remote-allocation").WithField("request", request).WithField("endpoint", endpoint).Debug("forwarding allocation request")

allocationResponse, err = c.remoteAllocationCallback(endpoint, dialOpts, request)
allocationResponse, err = c.remoteAllocationCallback(ctx, endpoint, dialOpts, request)
if err != nil {
c.baseLogger.Errorf("remote allocation failed with: %v", err)
// If there are multiple enpoints for the allocator connection and the current one is
Expand All @@ -356,6 +370,7 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca
}
break
}

return nil
})

Expand Down Expand Up @@ -565,6 +580,8 @@ func Retry(backoff wait.Backoff, fn func() error) error {
return true, nil
case err == ErrNoGameServerReady:
return true, err
case err == ErrTotalTimeoutExceeded:
return true, err
default:
lastConflictErr = err
return false, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/gameserverallocations/controller.go
Expand Up @@ -59,14 +59,18 @@ func NewController(apiServer *apiserver.APIServer,
kubeInformerFactory informers.SharedInformerFactory,
agonesClient versioned.Interface,
agonesInformerFactory externalversions.SharedInformerFactory,
remoteAllocationTimeout time.Duration,
totalAllocationTimeout time.Duration,
) *Controller {
c := &Controller{
api: apiServer,
allocator: NewAllocator(
agonesInformerFactory.Multicluster().V1().GameServerAllocationPolicies(),
kubeInformerFactory.Core().V1().Secrets(),
kubeClient,
NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), counter, health)),
NewReadyGameServerCache(agonesInformerFactory.Agones().V1().GameServers(), agonesClient.AgonesV1(), counter, health),
remoteAllocationTimeout,
totalAllocationTimeout),
}
c.baseLogger = runtime.NewLoggerWithType(c)

Expand Down