From 0cff4258c62eab791b3e1724a82e3d96028b1a19 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 21 Feb 2019 14:27:14 -0800 Subject: [PATCH] Unified logging of resource identifiers so that we can reliably get an entire history of a resource in stack driver. Well-known identifiers of resources types are defined in logfields package. --- pkg/fleetautoscalers/controller.go | 35 +++++++----- pkg/fleets/controller.go | 50 +++++++++------- pkg/gameserverallocations/controller.go | 39 ++++++++----- pkg/gameservers/controller.go | 73 +++++++++++++----------- pkg/gameservers/controller_test.go | 5 +- pkg/gameservers/health.go | 29 ++++++---- pkg/gameserversets/controller.go | 51 ++++++++++------- pkg/sdkserver/sdkserver.go | 7 ++- pkg/util/logfields/logfields.go | 23 ++++++++ pkg/util/workerqueue/workerqueue.go | 27 +++++---- pkg/util/workerqueue/workerqueue_test.go | 6 +- 11 files changed, 214 insertions(+), 131 deletions(-) create mode 100644 pkg/util/logfields/logfields.go diff --git a/pkg/fleetautoscalers/controller.go b/pkg/fleetautoscalers/controller.go index d8b0e27915..c4f7b2ab85 100644 --- a/pkg/fleetautoscalers/controller.go +++ b/pkg/fleetautoscalers/controller.go @@ -20,12 +20,14 @@ import ( "time" "agones.dev/agones/pkg/apis/stable" + "agones.dev/agones/pkg/apis/stable/v1alpha1" stablev1alpha1 "agones.dev/agones/pkg/apis/stable/v1alpha1" "agones.dev/agones/pkg/client/clientset/versioned" getterv1alpha1 "agones.dev/agones/pkg/client/clientset/versioned/typed/stable/v1alpha1" "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/crd" + "agones.dev/agones/pkg/util/logfields" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/webhooks" "agones.dev/agones/pkg/util/workerqueue" @@ -48,7 +50,7 @@ import ( // Controller is a the FleetAutoscaler controller type Controller struct { - logger *logrus.Entry + baseLogger *logrus.Entry crdGetter v1beta1.CustomResourceDefinitionInterface fleetGetter getterv1alpha1.FleetsGetter fleetLister listerv1alpha1.FleetLister @@ -79,12 +81,12 @@ func NewController( fleetAutoscalerLister: agonesInformer.FleetAutoscalers().Lister(), fleetAutoscalerSynced: fasInformer.HasSynced, } - c.logger = runtime.NewLoggerWithType(c) - c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleetAutoscaler, c.logger, stable.GroupName+".FleetAutoscalerController") + c.baseLogger = runtime.NewLoggerWithType(c) + c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleetAutoscaler, c.baseLogger, logfields.FleetAutoscalerKey, stable.GroupName+".FleetAutoscalerController") health.AddLivenessCheck("fleetautoscaler-workerqueue", healthcheck.Check(c.workerqueue.Healthy)) eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(c.logger.Infof) + eventBroadcaster.StartLogging(c.baseLogger.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "fleetautoscaler-controller"}) @@ -105,12 +107,12 @@ func NewController( // Run the FleetAutoscaler controller. Will block until stop is closed. // Runs threadiness number workers to process the rate limited queue func (c *Controller) Run(workers int, stop <-chan struct{}) error { - err := crd.WaitForEstablishedCRD(c.crdGetter, "fleetautoscalers."+stable.GroupName, c.logger) + err := crd.WaitForEstablishedCRD(c.crdGetter, "fleetautoscalers."+stable.GroupName, c.baseLogger) if err != nil { return err } - c.logger.Info("Wait for cache sync") + c.baseLogger.Info("Wait for cache sync") if !cache.WaitForCacheSync(stop, c.fleetAutoscalerSynced) { return errors.New("failed to wait for caches to sync") } @@ -119,14 +121,22 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error { return nil } +func (c *Controller) loggerForFleetAutoscalerKey(key string) *logrus.Entry { + return logfields.AugmentLogEntry(c.baseLogger, logfields.FleetAutoscalerKey, key) +} + +func (c *Controller) loggerForFleetAutoscaler(fas *v1alpha1.FleetAutoscaler) *logrus.Entry { + return c.loggerForFleetAutoscalerKey(fas.Namespace+"/"+fas.Name).WithField("fas", fas) +} + // validationHandler will intercept when a FleetAutoscaler is created, and // validate its settings. func (c *Controller) validationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - c.logger.WithField("review", review).Info("validationHandler") obj := review.Request.Object fas := &stablev1alpha1.FleetAutoscaler{} err := json.Unmarshal(obj.Raw, fas) if err != nil { + c.baseLogger.WithField("review", review).WithError(err).Info("validationHandler") return review, errors.Wrapf(err, "error unmarshalling original FleetAutoscaler json: %s", obj.Raw) } @@ -154,20 +164,20 @@ func (c *Controller) validationHandler(review admv1beta1.AdmissionReview) (admv1 // syncFleetAutoscaler scales the attached fleet and // synchronizes the FleetAutoscaler CRD func (c *Controller) syncFleetAutoscaler(key string) error { - c.logger.WithField("key", key).Info("Synchronising") + c.loggerForFleetAutoscalerKey(key).Info("Synchronising") // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(c.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(c.loggerForFleetAutoscalerKey(key), errors.Wrapf(err, "invalid resource key")) return nil } fas, err := c.fleetAutoscalerLister.FleetAutoscalers(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - c.logger.WithField("key", key).Info(fmt.Sprintf("FleetAutoscaler %s from namespace %s is no longer available for syncing", name, namespace)) + c.loggerForFleetAutoscalerKey(key).Info(fmt.Sprintf("FleetAutoscaler %s from namespace %s is no longer available for syncing", name, namespace)) return nil } return errors.Wrapf(err, "error retrieving FleetAutoscaler %s from namespace %s", name, namespace) @@ -177,10 +187,7 @@ func (c *Controller) syncFleetAutoscaler(key string) error { fleet, err := c.fleetLister.Fleets(namespace).Get(fas.Spec.FleetName) if err != nil { if k8serrors.IsNotFound(err) { - logrus.WithError(err).WithField("fleetAutoscaler", fas.Name). - WithField("fleet", fas.Spec.FleetName). - WithField("namespace", namespace). - Warn("Could not find fleet for autoscaler. Skipping.") + c.loggerForFleetAutoscaler(fas).Warn("Could not find fleet for autoscaler. Skipping.") c.recorder.Eventf(fas, corev1.EventTypeWarning, "FailedGetFleet", "could not fetch fleet: %s", fas.Spec.FleetName) diff --git a/pkg/fleets/controller.go b/pkg/fleets/controller.go index d92c4398f7..714a4102f2 100644 --- a/pkg/fleets/controller.go +++ b/pkg/fleets/controller.go @@ -26,6 +26,7 @@ import ( "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/crd" + "agones.dev/agones/pkg/util/logfields" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/webhooks" "agones.dev/agones/pkg/util/workerqueue" @@ -50,7 +51,7 @@ import ( // Controller is a the GameServerSet controller type Controller struct { - logger *logrus.Entry + baseLogger *logrus.Entry crdGetter v1beta1.CustomResourceDefinitionInterface gameServerSetGetter getterv1alpha1.GameServerSetsGetter gameServerSetLister listerv1alpha1.GameServerSetLister @@ -87,12 +88,12 @@ func NewController( fleetSynced: fInformer.HasSynced, } - c.logger = runtime.NewLoggerWithType(c) - c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleet, c.logger, stable.GroupName+".FleetController") + c.baseLogger = runtime.NewLoggerWithType(c) + c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleet, c.baseLogger, logfields.FleetKey, stable.GroupName+".FleetController") health.AddLivenessCheck("fleet-workerqueue", healthcheck.Check(c.workerqueue.Healthy)) eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(c.logger.Infof) + eventBroadcaster.StartLogging(c.baseLogger.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "fleet-controller"}) @@ -126,7 +127,7 @@ func NewController( // Should only be called on fleet create operations. // nolint:dupl func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - c.logger.WithField("review", review).Info("creationMutationHandler") + c.baseLogger.WithField("review", review).Info("creationMutationHandler") obj := review.Request.Object fleet := &stablev1alpha1.Fleet{} @@ -154,7 +155,7 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) return review, errors.Wrapf(err, "error creating json for patch for Fleet %s", fleet.ObjectMeta.Name) } - c.logger.WithField("fleet", fleet.ObjectMeta.Name).WithField("patch", string(jsn)).Infof("patch created!") + c.loggerForFleet(fleet).WithField("patch", string(jsn)).Infof("patch created!") pt := admv1beta1.PatchTypeJSONPatch review.Response.PatchType = &pt @@ -166,7 +167,7 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) // creationValidationHandler that validates a Fleet when it is created // Should only be called on Fleet create and Update operations. func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - c.logger.WithField("review", review).Info("creationValidationHandler") + c.baseLogger.WithField("review", review).Info("creationValidationHandler") obj := review.Request.Object fleet := &stablev1alpha1.Fleet{} @@ -191,7 +192,7 @@ func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview Details: &details, } - c.logger.WithField("review", review).Info("Invalid Fleet") + c.loggerForFleet(fleet).WithField("review", review).Info("Invalid Fleet") return review, nil } @@ -201,12 +202,12 @@ func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview // Run the Fleet controller. Will block until stop is closed. // Runs threadiness number workers to process the rate limited queue func (c *Controller) Run(workers int, stop <-chan struct{}) error { - err := crd.WaitForEstablishedCRD(c.crdGetter, "fleets.stable.agones.dev", c.logger) + err := crd.WaitForEstablishedCRD(c.crdGetter, "fleets.stable.agones.dev", c.baseLogger) if err != nil { return err } - c.logger.Info("Wait for cache sync") + c.baseLogger.Info("Wait for cache sync") if !cache.WaitForCacheSync(stop, c.gameServerSetSynced, c.fleetSynced) { return errors.New("failed to wait for caches to sync") } @@ -215,6 +216,14 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error { return nil } +func (c *Controller) loggerForFleetKey(key string) *logrus.Entry { + return logfields.AugmentLogEntry(c.baseLogger, logfields.FleetKey, key) +} + +func (c *Controller) loggerForFleet(f *v1alpha1.Fleet) *logrus.Entry { + return c.loggerForFleetKey(f.Namespace+"/"+f.Name).WithField("fleet", f) +} + // gameServerSetEventHandler enqueues the owning Fleet for this GameServerSet, // assuming that it has one func (c *Controller) gameServerSetEventHandler(obj interface{}) { @@ -227,9 +236,9 @@ func (c *Controller) gameServerSetEventHandler(obj interface{}) { fleet, err := c.fleetLister.Fleets(gsSet.ObjectMeta.Namespace).Get(ref.Name) if err != nil { if k8serrors.IsNotFound(err) { - c.logger.WithField("ref", ref).Info("Owner Fleet no longer available for syncing") + c.baseLogger.WithField("ref", ref).Info("Owner Fleet no longer available for syncing") } else { - runtime.HandleError(c.logger.WithField("fleet", fleet.ObjectMeta.Name).WithField("ref", ref), + runtime.HandleError(c.loggerForFleet(fleet).WithField("ref", ref), errors.Wrap(err, "error retrieving GameServerSet owner")) } return @@ -240,20 +249,20 @@ func (c *Controller) gameServerSetEventHandler(obj interface{}) { // syncFleet synchronised the fleet CRDs and configures/updates // backing GameServerSets func (c *Controller) syncFleet(key string) error { - c.logger.WithField("key", key).Info("Synchronising") + c.loggerForFleetKey(key).Info("Synchronising") // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(c.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(c.loggerForFleetKey(key), errors.Wrapf(err, "invalid resource key")) return nil } fleet, err := c.fleetLister.Fleets(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - c.logger.WithField("key", key).Info("Fleet is no longer available for syncing") + c.loggerForFleetKey(key).Info("Fleet is no longer available for syncing") return nil } return errors.Wrapf(err, "error retrieving fleet %s from namespace %s", name, namespace) @@ -267,7 +276,7 @@ func (c *Controller) syncFleet(key string) error { active, rest := c.filterGameServerSetByActive(fleet, list) // if there isn't an active gameServerSet, create one (but don't persist yet) if active == nil { - c.logger.WithField("fleet", fleet.ObjectMeta.Name).Info("could not find active GameServerSet, creating") + c.loggerForFleet(fleet).Info("could not find active GameServerSet, creating") active = fleet.GameServerSet() } @@ -358,7 +367,7 @@ func (c *Controller) deleteEmptyGameServerSets(fleet *stablev1alpha1.Fleet, list func (c *Controller) recreateDeployment(fleet *stablev1alpha1.Fleet, rest []*stablev1alpha1.GameServerSet) (int32, error) { for _, gsSet := range rest { if gsSet.Spec.Replicas != 0 { - c.logger.WithField("gameserverset", gsSet.ObjectMeta.Name).Info("applying recreate deployment: scaling to 0") + c.loggerForFleet(fleet).WithField("gameserverset", gsSet.ObjectMeta.Name).Info("applying recreate deployment: scaling to 0") gsSetCopy := gsSet.DeepCopy() gsSetCopy.Spec.Replicas = 0 if _, err := c.gameServerSetGetter.GameServerSets(gsSetCopy.ObjectMeta.Namespace).Update(gsSetCopy); err != nil { @@ -421,7 +430,7 @@ func (c *Controller) rollingUpdateActive(fleet *stablev1alpha1.Fleet, active *st replicas = fleet.LowerBoundReplicas(replicas - sumAllocated) } - c.logger.WithField("gameserverset", active.ObjectMeta.Name).WithField("replicas", replicas). + c.loggerForFleet(fleet).WithField("gameserverset", active.ObjectMeta.Name).WithField("replicas", replicas). Info("applying rolling update to active gameserverset") return replicas, nil @@ -454,7 +463,7 @@ func (c *Controller) rollingUpdateRest(fleet *stablev1alpha1.Fleet, rest []*stab gsSetCopy := gsSet.DeepCopy() gsSetCopy.Spec.Replicas = fleet.LowerBoundReplicas(gsSetCopy.Spec.Replicas - unavailable) - c.logger.WithField("gameserverset", gsSet.ObjectMeta.Name).WithField("replicas", gsSetCopy.Spec.Replicas). + c.loggerForFleet(fleet).WithField("gameserverset", gsSet.ObjectMeta.Name).WithField("replicas", gsSetCopy.Spec.Replicas). Info("applying rolling update to inactive gameserverset") if _, err := c.gameServerSetGetter.GameServerSets(gsSetCopy.ObjectMeta.Namespace).Update(gsSetCopy); err != nil { @@ -473,8 +482,7 @@ func (c *Controller) rollingUpdateRest(fleet *stablev1alpha1.Fleet, rest []*stab // updateFleetStatus gets the GameServerSets for this Fleet and then // calculates the counts for the status, and updates the Fleet func (c *Controller) updateFleetStatus(fleet *stablev1alpha1.Fleet) error { - - c.logger.WithField("key", fleet.Name).Info("Update Fleet Status") + c.loggerForFleet(fleet).Info("Update Fleet Status") list, err := ListGameServerSetsByFleetOwner(c.gameServerSetLister, fleet) if err != nil { diff --git a/pkg/gameserverallocations/controller.go b/pkg/gameserverallocations/controller.go index 41a8fc1c8a..61ecc046a6 100644 --- a/pkg/gameserverallocations/controller.go +++ b/pkg/gameserverallocations/controller.go @@ -25,6 +25,7 @@ import ( "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/crd" + "agones.dev/agones/pkg/util/logfields" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/webhooks" "agones.dev/agones/pkg/util/workerqueue" @@ -55,7 +56,7 @@ var ( // Controller is a the GameServerAllocation controller type Controller struct { - logger *logrus.Entry + baseLogger *logrus.Entry counter *AllocationCounter crdGetter v1beta1.CustomResourceDefinitionInterface gameServerSynced cache.InformerSynced @@ -95,12 +96,12 @@ func NewController(wh *webhooks.WebHook, gameServerAllocationGetter: agonesClient.StableV1alpha1(), allocationMutex: allocationMutex, } - c.logger = runtime.NewLoggerWithType(c) - c.workerqueue = workerqueue.NewWorkerQueue(c.syncDelete, c.logger, stable.GroupName+".GameServerAllocationController") + c.baseLogger = runtime.NewLoggerWithType(c) + c.workerqueue = workerqueue.NewWorkerQueue(c.syncDelete, c.baseLogger, logfields.GameServerAllocationKey, stable.GroupName+".GameServerAllocationController") health.AddLivenessCheck("gameserverallocation-workerqueue", healthcheck.Check(c.workerqueue.Healthy)) eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(c.logger.Infof) + eventBroadcaster.StartLogging(c.baseLogger.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "GameServerAllocation-controller"}) @@ -124,7 +125,7 @@ func NewController(wh *webhooks.WebHook, // Runs threadiness number workers to process the rate limited queue // Probably only needs 1 worker, as its just deleting unallocated GameServerAllocations func (c *Controller) Run(workers int, stop <-chan struct{}) error { - err := crd.WaitForEstablishedCRD(c.crdGetter, "gameserverallocations."+stable.GroupName, c.logger) + err := crd.WaitForEstablishedCRD(c.crdGetter, "gameserverallocations."+stable.GroupName, c.baseLogger) if err != nil { return err } @@ -136,7 +137,7 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error { c.stop = stop - c.logger.Info("Wait for cache sync") + c.baseLogger.Info("Wait for cache sync") if !cache.WaitForCacheSync(stop, c.gameServerAllocationSynced) { return errors.New("failed to wait for caches to sync") } @@ -145,16 +146,24 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error { return nil } +func (c *Controller) loggerForGameServerAllocationKey(key string) *logrus.Entry { + return logfields.AugmentLogEntry(c.baseLogger, logfields.GameServerAllocationKey, key) +} + +func (c *Controller) loggerForGameServerAllocation(gsa *v1alpha1.GameServerAllocation) *logrus.Entry { + return c.loggerForGameServerAllocationKey(gsa.Namespace+"/"+gsa.Name).WithField("gsa", gsa) +} + // creationMutationHandler will intercept when a GameServerAllocation is created, and allocate it a GameServer // assuming that one is available. If not, it will reject the AdmissionReview. func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - c.logger.WithField("review", review).Info("creationMutationHandler") + c.baseLogger.WithField("review", review).Info("creationMutationHandler") obj := review.Request.Object gsa := &v1alpha1.GameServerAllocation{} err := json.Unmarshal(obj.Raw, gsa) if err != nil { - c.logger.WithError(err).Error("error unmarchaslling json") + c.baseLogger.WithError(err).Error("error unmarshalling json") return review, errors.Wrapf(err, "error unmarshalling original GameServerAllocation json: %s", obj.Raw) } @@ -179,23 +188,23 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) newFA, err := json.Marshal(gsa) if err != nil { - c.logger.WithError(err).Error("error marshalling") + c.baseLogger.WithError(err).Error("error marshalling") return review, errors.Wrapf(err, "error marshalling GameServerAllocation %s to json", gsa.ObjectMeta.Name) } patch, err := jsonpatch.CreatePatch(obj.Raw, newFA) if err != nil { - c.logger.WithError(err).Error("error creating the patch") + c.baseLogger.WithError(err).Error("error creating the patch") return review, errors.Wrapf(err, "error creating patch for GameServerAllocation %s", gsa.ObjectMeta.Name) } json, err := json.Marshal(patch) if err != nil { - c.logger.WithError(err).Error("error creating the json for the patch") + c.baseLogger.WithError(err).Error("error creating the json for the patch") return review, errors.Wrapf(err, "error creating json for patch for GameServerAllocation %s", gs.ObjectMeta.Name) } - c.logger.WithField("gsa", gsa.ObjectMeta.Name).WithField("patch", string(json)).Infof("patch created!") + c.loggerForGameServerAllocation(gsa).WithField("patch", string(json)).Infof("patch created!") pt := admv1beta1.PatchTypeJSONPatch review.Response.PatchType = &pt @@ -207,7 +216,7 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) // GameServerAllocation fleetName value // nolint: dupl func (c *Controller) mutationValidationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - c.logger.WithField("review", review).Info("mutationValidationHandler") + c.baseLogger.WithField("review", review).Info("mutationValidationHandler") newGSA := &v1alpha1.GameServerAllocation{} oldGSA := &v1alpha1.GameServerAllocation{} @@ -310,11 +319,11 @@ func (c *Controller) patchMetadata(gs *v1alpha1.GameServer, fam v1alpha1.MetaPat // syncDelete takes unallocated GameServerAllocations, and deletes them! func (c *Controller) syncDelete(key string) error { - c.logger.WithField("key", key).Info("Deleting gameserverallocation") + c.loggerForGameServerAllocationKey(key).Info("Deleting gameserverallocation") namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(c.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(c.loggerForGameServerAllocationKey(key), errors.Wrapf(err, "invalid resource key")) return nil } diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 8fd2ddccdc..5e99e4ed6e 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -27,6 +27,7 @@ import ( "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/crd" + "agones.dev/agones/pkg/util/logfields" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/webhooks" "agones.dev/agones/pkg/util/workerqueue" @@ -59,7 +60,7 @@ var ( // Controller is a the main GameServer crd controller type Controller struct { - logger *logrus.Entry + baseLogger *logrus.Entry sidecarImage string alwaysPullSidecarImage bool sidecarCPURequest resource.Quantity @@ -118,16 +119,16 @@ func NewController( healthController: NewHealthController(kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), } - c.logger = runtime.NewLoggerWithType(c) + c.baseLogger = runtime.NewLoggerWithType(c) eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(c.logger.Infof) + eventBroadcaster.StartLogging(c.baseLogger.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"}) - c.workerqueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.logger, stable.GroupName+".GameServerController", fastRateLimiter()) - c.creationWorkerQueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.logger, stable.GroupName+".GameServerControllerCreation", fastRateLimiter()) - c.deletionWorkerQueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.logger, stable.GroupName+".GameServerControllerDeletion", fastRateLimiter()) + c.workerqueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.baseLogger, logfields.GameServerKey, stable.GroupName+".GameServerController", fastRateLimiter()) + c.creationWorkerQueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.baseLogger.WithField("subqueue", "creation"), logfields.GameServerKey, stable.GroupName+".GameServerControllerCreation", fastRateLimiter()) + c.deletionWorkerQueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.baseLogger.WithField("subqueue", "deletion"), logfields.GameServerKey, stable.GroupName+".GameServerControllerDeletion", fastRateLimiter()) health.AddLivenessCheck("gameserver-workerqueue", healthcheck.Check(c.workerqueue.Healthy)) health.AddLivenessCheck("gameserver-creation-workerqueue", healthcheck.Check(c.creationWorkerQueue.Healthy)) health.AddLivenessCheck("gameserver-deletion-workerqueue", healthcheck.Check(c.deletionWorkerQueue.Healthy)) @@ -203,12 +204,11 @@ func fastRateLimiter() workqueue.RateLimiter { // Should only be called on gameserver create operations. // nolint:dupl func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - c.logger.WithField("review", review).Info("creationMutationHandler") - obj := review.Request.Object gs := &v1alpha1.GameServer{} err := json.Unmarshal(obj.Raw, gs) if err != nil { + c.baseLogger.WithField("review", review).WithError(err).Info("creationMutationHandler failed to unmarshal JSON") return review, errors.Wrapf(err, "error unmarshalling original GameServer json: %s", obj.Raw) } @@ -231,7 +231,7 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) return review, errors.Wrapf(err, "error creating json for patch for GameServer %s", gs.ObjectMeta.Name) } - c.logger.WithField("gs", gs.ObjectMeta.Name).WithField("patch", string(json)).Infof("patch created!") + c.loggerForGameServer(gs).WithField("patch", string(json)).Infof("patch created!") pt := admv1beta1.PatchTypeJSONPatch review.Response.PatchType = &pt @@ -240,18 +240,27 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) return review, nil } +func (c *Controller) loggerForGameServerKey(key string) *logrus.Entry { + return logfields.AugmentLogEntry(c.baseLogger, logfields.GameServerKey, key) +} + +func (c *Controller) loggerForGameServer(gs *v1alpha1.GameServer) *logrus.Entry { + return c.loggerForGameServerKey(gs.Namespace+"/"+gs.Name).WithField("gs", gs) +} + // creationValidationHandler that validates a GameServer when it is created // Should only be called on gameserver create operations. func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - c.logger.WithField("review", review).Info("creationValidationHandler") - obj := review.Request.Object gs := &v1alpha1.GameServer{} err := json.Unmarshal(obj.Raw, gs) if err != nil { + c.baseLogger.WithField("review", review).WithError(err).Info("creationValidationHandler failed to unmarshal JSON") return review, errors.Wrapf(err, "error unmarshalling original GameServer json: %s", obj.Raw) } + c.loggerForGameServer(gs).WithField("review", review).Info("creationValidationHandler") + ok, causes := gs.Validate() if !ok { review.Response.Allowed = false @@ -268,7 +277,7 @@ func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview Details: &details, } - c.logger.WithField("review", review).Info("Invalid GameServer") + c.loggerForGameServer(gs).WithField("review", review).Info("Invalid GameServer") return review, nil } @@ -280,12 +289,12 @@ func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview func (c *Controller) Run(workers int, stop <-chan struct{}) error { c.stop = stop - err := crd.WaitForEstablishedCRD(c.crdGetter, "gameservers.stable.agones.dev", c.logger) + err := crd.WaitForEstablishedCRD(c.crdGetter, "gameservers.stable.agones.dev", c.baseLogger) if err != nil { return err } - c.logger.Info("Wait for cache sync") + c.baseLogger.Info("Wait for cache sync") if !cache.WaitForCacheSync(stop, c.gameServerSynced) { return errors.New("failed to wait for caches to sync") } @@ -319,20 +328,20 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error { // syncGameServer synchronises the Pods for the GameServers. // and reacts to status changes that can occur through the client SDK func (c *Controller) syncGameServer(key string) error { - c.logger.WithField("key", key).Info("Synchronising") + c.loggerForGameServerKey(key).Info("Synchronising") // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(c.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(c.loggerForGameServerKey(key), errors.Wrapf(err, "invalid resource key")) return nil } gs, err := c.gameServerLister.GameServers(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - c.logger.WithField("key", key).Info("GameServer is no longer available for syncing") + c.loggerForGameServerKey(key).Info("GameServer is no longer available for syncing") return nil } return errors.Wrapf(err, "error retrieving GameServer %s from namespace %s", name, namespace) @@ -372,14 +381,14 @@ func (c *Controller) syncGameServerDeletionTimestamp(gs *v1alpha1.GameServer) (* return gs, nil } - c.logger.WithField("gs", gs).Info("Syncing with Deletion Timestamp") + c.loggerForGameServer(gs).Info("Syncing with Deletion Timestamp") pods, err := c.listGameServerPods(gs) if err != nil { return gs, err } if len(pods) > 0 { - c.logger.WithField("pods", pods).WithField("gsName", gs.ObjectMeta.Name).Info("Found pods, deleting") + c.loggerForGameServer(gs).WithField("pods", pods).Info("Found pods, deleting") for _, p := range pods { err = c.podGetter.Pods(p.ObjectMeta.Namespace).Delete(p.ObjectMeta.Name, nil) if err != nil { @@ -399,7 +408,7 @@ func (c *Controller) syncGameServerDeletionTimestamp(gs *v1alpha1.GameServer) (* } } gsCopy.ObjectMeta.Finalizers = fin - c.logger.WithField("gs", gsCopy).Infof("No pods found, removing finalizer %s", stable.GroupName) + c.loggerForGameServer(gsCopy).Infof("No pods found, removing finalizer %s", stable.GroupName) gs, err = c.gameServerGetter.GameServers(gsCopy.ObjectMeta.Namespace).Update(gsCopy) return gs, errors.Wrapf(err, "error removing finalizer for GameServer %s", gsCopy.ObjectMeta.Name) } @@ -415,7 +424,7 @@ func (c *Controller) syncGameServerPortAllocationState(gs *v1alpha1.GameServer) gsCopy.Status.State = v1alpha1.GameServerStateCreating c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Port allocated") - c.logger.WithField("gs", gsCopy).Info("Syncing Port Allocation GameServerState") + c.loggerForGameServer(gsCopy).Info("Syncing Port Allocation GameServerState") gs, err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy) if err != nil { // if the GameServer doesn't get updated with the port data, then put the port @@ -437,7 +446,7 @@ func (c *Controller) syncGameServerCreatingState(gs *v1alpha1.GameServer) (*v1al return gs, nil } - c.logger.WithField("gs", gs).Info("Syncing Create State") + c.loggerForGameServer(gs).Info("Syncing Create State") // Wait for pod cache sync, so that we don't end up with multiple pods for a GameServer if !(cache.WaitForCacheSync(c.stop, c.podSynced)) { @@ -479,7 +488,7 @@ func (c *Controller) syncDevelopmentGameServer(gs *v1alpha1.GameServer) (*v1alph } if !(gs.Status.State == v1alpha1.GameServerStateReady) { - c.logger.WithField("gs", gs).Infof("%s is a development game server and will not be managed by Agones.", gs.Name) + c.loggerForGameServer(gs).Info("GS is a development game server and will not be managed by Agones.") } gsCopy := gs.DeepCopy() @@ -507,14 +516,14 @@ func (c *Controller) createGameServerPod(gs *v1alpha1.GameServer) (*v1alpha1.Gam // this shouldn't happen, but if it does. if err != nil { - c.logger.WithField("gameserver", gs).WithError(err).Error("error creating pod from Game Server") + c.loggerForGameServer(gs).WithError(err).Error("error creating pod from Game Server") gs, err = c.moveToErrorState(gs, err.Error()) return gs, err } c.addGameServerHealthCheck(gs, pod) - c.logger.WithField("pod", pod).Info("creating Pod for GameServer") + c.loggerForGameServer(gs).WithField("pod", pod).Info("creating Pod for GameServer") pod, err = c.podGetter.Pods(gs.ObjectMeta.Namespace).Create(pod) if k8serrors.IsAlreadyExists(err) { c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Pod already exists, reused") @@ -522,7 +531,7 @@ func (c *Controller) createGameServerPod(gs *v1alpha1.GameServer) (*v1alpha1.Gam } if err != nil { if k8serrors.IsInvalid(err) { - c.logger.WithField("pod", pod).WithField("gameserver", gs).Errorf("Pod created is invalid") + c.loggerForGameServer(gs).WithField("pod", pod).Errorf("Pod created is invalid") gs, err = c.moveToErrorState(gs, err.Error()) return gs, err } @@ -615,7 +624,7 @@ func (c *Controller) syncGameServerStartingState(gs *v1alpha1.GameServer) (*v1al return gs, nil } - c.logger.WithField("gs", gs).Info("Syncing Starting GameServerState") + c.loggerForGameServer(gs).Info("Syncing Starting GameServerState") // there should be a pod (although it may not have a scheduled container), // so if there is an error of any kind, then move this to queue backoff @@ -644,7 +653,7 @@ func (c *Controller) syncGameServerStartingState(gs *v1alpha1.GameServer) (*v1al // applyGameServerAddressAndPort gets the backing Pod for the GamesServer, // and sets the allocated Address and Port values to it and returns it. func (c *Controller) applyGameServerAddressAndPort(gs *v1alpha1.GameServer, pod *corev1.Pod) (*v1alpha1.GameServer, error) { - addr, err := c.address(pod) + addr, err := c.address(gs, pod) if err != nil { return gs, errors.Wrapf(err, "error getting external address for GameServer %s", gs.ObjectMeta.Name) } @@ -673,7 +682,7 @@ func (c *Controller) syncGameServerRequestReadyState(gs *v1alpha1.GameServer) (* return gs, nil } - c.logger.WithField("gs", gs).Info("Syncing RequestReady State") + c.loggerForGameServer(gs).Info("Syncing RequestReady State") gsCopy := gs.DeepCopy() @@ -714,7 +723,7 @@ func (c *Controller) syncGameServerShutdownState(gs *v1alpha1.GameServer) error return nil } - c.logger.WithField("gs", gs).Info("Syncing Shutdown State") + c.loggerForGameServer(gs).Info("Syncing Shutdown State") // be explicit about where to delete. p := metav1.DeletePropagationBackground err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, &metav1.DeleteOptions{PropagationPolicy: &p}) @@ -784,7 +793,7 @@ func (c *Controller) listGameServerPods(gs *v1alpha1.GameServer) ([]*corev1.Pod, // This should be the externalIP, but if the externalIP is // not set, it will fall back to the internalIP with a warning. // (basically because minikube only has an internalIP) -func (c *Controller) address(pod *corev1.Pod) (string, error) { +func (c *Controller) address(gs *v1alpha1.GameServer, pod *corev1.Pod) (string, error) { node, err := c.nodeLister.Get(pod.Spec.NodeName) if err != nil { return "", errors.Wrapf(err, "error retrieving node %s for Pod %s", pod.Spec.NodeName, pod.ObjectMeta.Name) @@ -797,7 +806,7 @@ func (c *Controller) address(pod *corev1.Pod) (string, error) { } // minikube only has an InternalIP on a Node, so we'll fall back to that. - c.logger.WithField("node", node.ObjectMeta.Name).Warn("Could not find ExternalIP. Falling back to Internal") + c.loggerForGameServer(gs).WithField("node", node.ObjectMeta.Name).Warn("Could not find ExternalIP. Falling back to Internal") for _, a := range node.Status.Addresses { if a.Type == corev1.NodeInternalIP { return a.Address, nil diff --git a/pkg/gameservers/controller_test.go b/pkg/gameservers/controller_test.go index 4a4d2dabb0..d70adc668b 100644 --- a/pkg/gameservers/controller_test.go +++ b/pkg/gameservers/controller_test.go @@ -1022,6 +1022,9 @@ func TestControllerAddress(t *testing.T) { }, } + dummyGS := &v1alpha1.GameServer{} + dummyGS.Name = "some-gs" + for name, fixture := range fixture { t.Run(name, func(t *testing.T) { c, mocks := newFakeController() @@ -1041,7 +1044,7 @@ func TestControllerAddress(t *testing.T) { _, cancel := agtesting.StartInformers(mocks, c.gameServerSynced, podSynced, nodeSynced) defer cancel() - addr, err := c.address(&pod) + addr, err := c.address(dummyGS, &pod) assert.Nil(t, err) assert.Equal(t, fixture.expectedAddress, addr) }) diff --git a/pkg/gameservers/health.go b/pkg/gameservers/health.go index 770364be8f..ea1bb48fe8 100644 --- a/pkg/gameservers/health.go +++ b/pkg/gameservers/health.go @@ -23,6 +23,7 @@ import ( getterv1alpha1 "agones.dev/agones/pkg/client/clientset/versioned/typed/stable/v1alpha1" "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" + "agones.dev/agones/pkg/util/logfields" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/workerqueue" "github.com/pkg/errors" @@ -44,7 +45,7 @@ import ( // an Unhealthy state if certain pods crash, or can't be assigned a port, and other // similar type conditions. type HealthController struct { - logger *logrus.Entry + baseLogger *logrus.Entry podSynced cache.InformerSynced podLister corelisterv1.PodLister gameServerGetter getterv1alpha1.GameServersGetter @@ -65,11 +66,11 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned gameServerLister: agonesInformerFactory.Stable().V1alpha1().GameServers().Lister(), } - hc.logger = runtime.NewLoggerWithType(hc) - hc.workerqueue = workerqueue.NewWorkerQueue(hc.syncGameServer, hc.logger, stable.GroupName+".HealthController") + hc.baseLogger = runtime.NewLoggerWithType(hc) + hc.workerqueue = workerqueue.NewWorkerQueue(hc.syncGameServer, hc.baseLogger, logfields.GameServerKey, stable.GroupName+".HealthController") eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(hc.logger.Infof) + eventBroadcaster.StartLogging(hc.baseLogger.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) hc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "health-controller"}) @@ -124,22 +125,30 @@ func (hc *HealthController) Run(stop <-chan struct{}) { hc.workerqueue.Run(1, stop) } +func (hc *HealthController) loggerForGameServerKey(key string) *logrus.Entry { + return logfields.AugmentLogEntry(hc.baseLogger, logfields.GameServerKey, key) +} + +func (hc *HealthController) loggerForGameServer(gs *v1alpha1.GameServer) *logrus.Entry { + return hc.loggerForGameServerKey(gs.Namespace+"/"+gs.Name).WithField("gs", gs) +} + // syncGameServer sets the GameSerer to Unhealthy, if its state is Ready func (hc *HealthController) syncGameServer(key string) error { - hc.logger.WithField("key", key).Info("Synchronising") + hc.loggerForGameServerKey(key).Info("Synchronising") // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(hc.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(hc.loggerForGameServerKey(key), errors.Wrapf(err, "invalid resource key")) return nil } gs, err := hc.gameServerLister.GameServers(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - hc.logger.WithField("key", key).Info("GameServer is no longer available for syncing") + hc.loggerForGameServerKey(key).Info("GameServer is no longer available for syncing") return nil } return errors.Wrapf(err, "error retrieving GameServer %s from namespace %s", name, namespace) @@ -151,18 +160,18 @@ func (hc *HealthController) syncGameServer(key string) error { switch gs.Status.State { case v1alpha1.GameServerStateStarting: - hc.logger.WithField("key", key).Info("GameServer cannot start on this port") + hc.loggerForGameServer(gs).Info("GameServer cannot start on this port") unhealthy = true reason = "No nodes have free ports for the allocated ports" case v1alpha1.GameServerStateReady: - hc.logger.WithField("key", key).Info("GameServer container has terminated") + hc.loggerForGameServer(gs).Info("GameServer container has terminated") unhealthy = true reason = "GameServer container terminated" } if unhealthy { - hc.logger.WithField("gs", gs).Infof("Marking GameServer as GameServerStateUnhealthy") + hc.loggerForGameServer(gs).Infof("Marking GameServer as GameServerStateUnhealthy") gsCopy := gs.DeepCopy() gsCopy.Status.State = v1alpha1.GameServerStateUnhealthy diff --git a/pkg/gameserversets/controller.go b/pkg/gameserversets/controller.go index 8eb3e73d7a..669fec7bec 100644 --- a/pkg/gameserversets/controller.go +++ b/pkg/gameserversets/controller.go @@ -26,6 +26,7 @@ import ( "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/crd" + "agones.dev/agones/pkg/util/logfields" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/webhooks" "agones.dev/agones/pkg/util/workerqueue" @@ -64,7 +65,7 @@ const ( // Controller is a the GameServerSet controller type Controller struct { - logger *logrus.Entry + baseLogger *logrus.Entry crdGetter v1beta1.CustomResourceDefinitionInterface gameServerGetter getterv1alpha1.GameServersGetter gameServerLister listerv1alpha1.GameServerLister @@ -103,12 +104,12 @@ func NewController( stateCache: &gameServerStateCache{}, } - c.logger = runtime.NewLoggerWithType(c) - c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServerSet, c.logger, stable.GroupName+".GameServerSetController") + c.baseLogger = runtime.NewLoggerWithType(c) + c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServerSet, c.baseLogger, logfields.GameServerSetKey, stable.GroupName+".GameServerSetController") health.AddLivenessCheck("gameserverset-workerqueue", healthcheck.Check(c.workerqueue.Healthy)) eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(c.logger.Infof) + eventBroadcaster.StartLogging(c.baseLogger.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserverset-controller"}) @@ -149,12 +150,12 @@ func NewController( func (c *Controller) Run(workers int, stop <-chan struct{}) error { c.stop = stop - err := crd.WaitForEstablishedCRD(c.crdGetter, "gameserversets."+stable.GroupName, c.logger) + err := crd.WaitForEstablishedCRD(c.crdGetter, "gameserversets."+stable.GroupName, c.baseLogger) if err != nil { return err } - c.logger.Info("Wait for cache sync") + c.baseLogger.Info("Wait for cache sync") if !cache.WaitForCacheSync(stop, c.gameServerSynced, c.gameServerSetSynced) { return errors.New("failed to wait for caches to sync") } @@ -166,7 +167,7 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error { // updateValidationHandler that validates a GameServerSet when is updated // Should only be called on gameserverset update operations. func (c *Controller) updateValidationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - c.logger.WithField("review", review).Info("updateValidationHandler") + c.baseLogger.WithField("review", review).Info("updateValidationHandler") newGss := &v1alpha1.GameServerSet{} oldGss := &v1alpha1.GameServerSet{} @@ -197,7 +198,7 @@ func (c *Controller) updateValidationHandler(review admv1beta1.AdmissionReview) Details: &details, } - c.logger.WithField("review", review).Info("Invalid GameServerSet update") + c.loggerForGameServerSet(newGss).WithField("review", review).Info("Invalid GameServerSet update") return review, nil } @@ -207,7 +208,7 @@ func (c *Controller) updateValidationHandler(review admv1beta1.AdmissionReview) // creationValidationHandler that validates a GameServerSet when is created // Should only be called on gameserverset create operations. func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { - c.logger.WithField("review", review).Info("creationValidationHandler") + c.baseLogger.WithField("review", review).Info("creationValidationHandler") newGss := &v1alpha1.GameServerSet{} @@ -232,7 +233,7 @@ func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview Details: &details, } - c.logger.WithField("review", review).Info("Invalid GameServerSet update") + c.loggerForGameServerSet(newGss).WithField("review", review).Info("Invalid GameServerSet update") return review, nil } @@ -252,9 +253,9 @@ func (c *Controller) gameServerEventHandler(obj interface{}) { gsSet, err := c.gameServerSetLister.GameServerSets(gs.ObjectMeta.Namespace).Get(ref.Name) if err != nil { if k8serrors.IsNotFound(err) { - c.logger.WithField("ref", ref).Info("Owner GameServerSet no longer available for syncing") + c.baseLogger.WithField("ref", ref).Info("Owner GameServerSet no longer available for syncing") } else { - runtime.HandleError(c.logger.WithField("gs", gs.ObjectMeta.Name).WithField("ref", ref), + runtime.HandleError(c.baseLogger.WithField("gsKey", gs.ObjectMeta.Namespace+"/"+gs.ObjectMeta.Name).WithField("ref", ref), errors.Wrap(err, "error retrieving GameServer owner")) } return @@ -262,24 +263,32 @@ func (c *Controller) gameServerEventHandler(obj interface{}) { c.workerqueue.EnqueueImmediately(gsSet) } +func (c *Controller) loggerForGameServerSetKey(key string) *logrus.Entry { + return logfields.AugmentLogEntry(c.baseLogger, logfields.GameServerSetKey, key) +} + +func (c *Controller) loggerForGameServerSet(gsSet *v1alpha1.GameServerSet) *logrus.Entry { + return c.loggerForGameServerSetKey(gsSet.Namespace+"/"+gsSet.Name).WithField("gss", gsSet) +} + // syncGameServer synchronises the GameServers for the Set, // making sure there are aways as many GameServers as requested func (c *Controller) syncGameServerSet(key string) error { - c.logger.WithField("key", key).Info("syncGameServerSet") - defer c.logger.WithField("key", key).Info("syncGameServerSet finished") + c.loggerForGameServerSetKey(key).Info("syncGameServerSet") + defer c.loggerForGameServerSetKey(key).Info("syncGameServerSet finished") // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { // don't return an error, as we don't want this retried - runtime.HandleError(c.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key")) + runtime.HandleError(c.loggerForGameServerSetKey(key), errors.Wrapf(err, "invalid resource key")) return nil } gsSet, err := c.gameServerSetLister.GameServerSets(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { - c.logger.WithField("key", key).Info("GameServerSet is no longer available for syncing") + c.loggerForGameServerSetKey(key).Info("GameServerSet is no longer available for syncing") return nil } return errors.Wrapf(err, "error retrieving GameServerSet %s from namespace %s", name, namespace) @@ -308,7 +317,7 @@ func (c *Controller) syncGameServerSet(key string) error { fields[key] = v.(int) + 1 } - c.logger. + c.loggerForGameServerSet(gsSet). WithField("targetReplicaCount", gsSet.Spec.Replicas). WithField("numServersToAdd", numServersToAdd). WithField("numServersToDelete", len(toDelete)). @@ -325,13 +334,13 @@ func (c *Controller) syncGameServerSet(key string) error { if numServersToAdd > 0 { if err := c.addMoreGameServers(gsSet, numServersToAdd); err != nil { - c.logger.WithError(err).Warning("error adding game servers") + c.loggerForGameServerSet(gsSet).WithError(err).Warning("error adding game servers") } } if len(toDelete) > 0 { if err := c.deleteGameServers(gsSet, toDelete); err != nil { - c.logger.WithError(err).Warning("error deleting game servers") + c.loggerForGameServerSet(gsSet).WithError(err).Warning("error deleting game servers") } } @@ -455,7 +464,7 @@ func isAllocated(gs *v1alpha1.GameServer) bool { // addMoreGameServers adds diff more GameServers to the set func (c *Controller) addMoreGameServers(gsSet *v1alpha1.GameServerSet, count int) error { - c.logger.WithField("count", count).WithField("gameserverset", gsSet.ObjectMeta.Name).Info("Adding more gameservers") + c.loggerForGameServerSet(gsSet).WithField("count", count).Info("Adding more gameservers") return parallelize(newGameServersChannel(count, gsSet), maxCreationParalellism, func(gs *v1alpha1.GameServer) error { gs, err := c.gameServerGetter.GameServers(gs.Namespace).Create(gs) @@ -470,7 +479,7 @@ func (c *Controller) addMoreGameServers(gsSet *v1alpha1.GameServerSet, count int } func (c *Controller) deleteGameServers(gsSet *v1alpha1.GameServerSet, toDelete []*v1alpha1.GameServer) error { - c.logger.WithField("diff", len(toDelete)).WithField("gameserverset", gsSet.ObjectMeta.Name).Info("Deleting gameservers") + c.loggerForGameServerSet(gsSet).WithField("diff", len(toDelete)).Info("Deleting gameservers") return parallelize(gameServerListToChannel(toDelete), maxDeletionParallelism, func(gs *v1alpha1.GameServer) error { // We should not delete the gameservers directly buy set their state to shutdown and let the gameserver controller to delete diff --git a/pkg/sdkserver/sdkserver.go b/pkg/sdkserver/sdkserver.go index e9a3230c7c..c9c9d85bfa 100644 --- a/pkg/sdkserver/sdkserver.go +++ b/pkg/sdkserver/sdkserver.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "agones.dev/agones/pkg/util/logfields" + "agones.dev/agones/pkg/apis/stable" stablev1alpha1 "agones.dev/agones/pkg/apis/stable/v1alpha1" "agones.dev/agones/pkg/client/clientset/versioned" @@ -117,7 +119,7 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf } s.informerFactory = factory - s.logger = runtime.NewLoggerWithType(s) + s.logger = runtime.NewLoggerWithType(s).WithField("gsKey", namespace+"/"+gameServerName) gameServers.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, newObj interface{}) { @@ -153,9 +155,10 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf s.workerqueue = workerqueue.NewWorkerQueue( s.syncGameServer, s.logger, + logfields.GameServerKey, strings.Join([]string{stable.GroupName, s.namespace, s.gameServerName}, ".")) - s.logger.WithField("gameServerName", s.gameServerName).WithField("namespace", s.namespace).Info("created GameServer sidecar") + s.logger.Info("created GameServer sidecar") return s, nil } diff --git a/pkg/util/logfields/logfields.go b/pkg/util/logfields/logfields.go new file mode 100644 index 0000000000..4cf4c2d482 --- /dev/null +++ b/pkg/util/logfields/logfields.go @@ -0,0 +1,23 @@ +package logfields + +import ( + "github.com/sirupsen/logrus" +) + +// ResourceType identifies the type of a resource for the purpose of putting it in structural logs. +type ResourceType string + +// Identifiers used in logs for identifying Agones objects. +const ( + GameServerKey ResourceType = "gsKey" + GameServerSetKey ResourceType = "gssKey" + GameServerAllocationKey ResourceType = "gsaKey" + FleetKey ResourceType = "fleetKey" + FleetAllocationKey ResourceType = "faKey" + FleetAutoscalerKey ResourceType = "fasKey" +) + +// AugmentLogEntry creates derived log entry with a given resource identifier ("namespace/name") +func AugmentLogEntry(base *logrus.Entry, resourceType ResourceType, resourceID string) *logrus.Entry { + return base.WithField(string(resourceType), resourceID) +} diff --git a/pkg/util/workerqueue/workerqueue.go b/pkg/util/workerqueue/workerqueue.go index 35b4503745..898750432c 100644 --- a/pkg/util/workerqueue/workerqueue.go +++ b/pkg/util/workerqueue/workerqueue.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "agones.dev/agones/pkg/util/logfields" "agones.dev/agones/pkg/util/runtime" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -42,8 +43,9 @@ type Handler func(string) error // with controllers and related and processing Kubernetes watched // events and synchronising resources type WorkerQueue struct { - logger *logrus.Entry - queue workqueue.RateLimitingInterface + logger *logrus.Entry + keyName string + queue workqueue.RateLimitingInterface // SyncHandler is exported to make testing easier (hack) SyncHandler Handler @@ -53,15 +55,16 @@ type WorkerQueue struct { } // NewWorkerQueue returns a new worker queue for a given name -func NewWorkerQueue(handler Handler, logger *logrus.Entry, name string) *WorkerQueue { - return NewWorkerQueueWithRateLimiter(handler, logger, name, workqueue.DefaultControllerRateLimiter()) +func NewWorkerQueue(handler Handler, logger *logrus.Entry, keyName logfields.ResourceType, queueName string) *WorkerQueue { + return NewWorkerQueueWithRateLimiter(handler, logger, keyName, queueName, workqueue.DefaultControllerRateLimiter()) } // NewWorkerQueueWithRateLimiter returns a new worker queue for a given name and a custom rate limiter. -func NewWorkerQueueWithRateLimiter(handler Handler, logger *logrus.Entry, name string, rateLimiter workqueue.RateLimiter) *WorkerQueue { +func NewWorkerQueueWithRateLimiter(handler Handler, logger *logrus.Entry, keyName logfields.ResourceType, queueName string, rateLimiter workqueue.RateLimiter) *WorkerQueue { return &WorkerQueue{ - logger: logger.WithField("queue", name), - queue: workqueue.NewNamedRateLimitingQueue(rateLimiter, name), + keyName: string(keyName), + logger: logger.WithField("queue", queueName), + queue: workqueue.NewNamedRateLimitingQueue(rateLimiter, queueName), SyncHandler: handler, } } @@ -77,7 +80,7 @@ func (wq *WorkerQueue) Enqueue(obj interface{}) { runtime.HandleError(wq.logger.WithField("obj", obj), err) return } - wq.logger.WithField("key", key).Info("Enqueuing key") + wq.logger.WithField(wq.keyName, key).Info("Enqueuing") wq.queue.AddRateLimited(key) } @@ -92,7 +95,7 @@ func (wq *WorkerQueue) EnqueueImmediately(obj interface{}) { runtime.HandleError(wq.logger.WithField("obj", obj), err) return } - wq.logger.WithField("key", key).Info("Enqueuing key immediately") + wq.logger.WithField(wq.keyName, key).Info("Enqueuing immediately") wq.queue.Add(key) } @@ -113,12 +116,12 @@ func (wq *WorkerQueue) processNextWorkItem() bool { } defer wq.queue.Done(obj) - wq.logger.WithField("obj", obj).Info("Processing obj") + wq.logger.WithField(wq.keyName, obj).Info("Processing") var key string var ok bool if key, ok = obj.(string); !ok { - runtime.HandleError(wq.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) + runtime.HandleError(wq.logger.WithField(wq.keyName, obj), errors.Errorf("expected string in queue, but got %T", obj)) // this is a bad entry, we don't want to reprocess wq.queue.Forget(obj) return true @@ -126,7 +129,7 @@ func (wq *WorkerQueue) processNextWorkItem() bool { if err := wq.SyncHandler(key); err != nil { // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(wq.logger.WithField("obj", obj), err) + runtime.HandleError(wq.logger.WithField(wq.keyName, obj), err) wq.queue.AddRateLimited(obj) return true } diff --git a/pkg/util/workerqueue/workerqueue_test.go b/pkg/util/workerqueue/workerqueue_test.go index e55bdea8f7..f2c0707588 100644 --- a/pkg/util/workerqueue/workerqueue_test.go +++ b/pkg/util/workerqueue/workerqueue_test.go @@ -40,7 +40,7 @@ func TestWorkerQueueRun(t *testing.T) { return nil } - wq := NewWorkerQueue(syncHandler, logrus.WithField("source", "test"), "test") + wq := NewWorkerQueue(syncHandler, logrus.WithField("source", "test"), "testKey", "test") stop := make(chan struct{}) defer close(stop) @@ -70,7 +70,7 @@ func TestWorkerQueueHealthy(t *testing.T) { <-done return nil } - wq := NewWorkerQueue(handler, logrus.WithField("source", "test"), "test") + wq := NewWorkerQueue(handler, logrus.WithField("source", "test"), "testKey", "test") wq.Enqueue(cache.ExplicitKey("default/test")) stop := make(chan struct{}) @@ -108,7 +108,7 @@ func TestWorkQueueHealthCheck(t *testing.T) { handler := func(string) error { return nil } - wq := NewWorkerQueue(handler, logrus.WithField("source", "test"), "test") + wq := NewWorkerQueue(handler, logrus.WithField("source", "test"), "testKey", "test") health.AddLivenessCheck("test", wq.Healthy) server := httptest.NewServer(health)