Skip to content

Commit

Permalink
Unified logging of resource identifiers so that we can reliably get a…
Browse files Browse the repository at this point in the history
…n entire history of a resource in stack driver.

Well-known identifiers of resources types are defined in logfields package.
  • Loading branch information
jkowalski committed Feb 22, 2019
1 parent 97f1935 commit 0cff425
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 131 deletions.
35 changes: 21 additions & 14 deletions pkg/fleetautoscalers/controller.go
Expand Up @@ -20,12 +20,14 @@ import (
"time"

"agones.dev/agones/pkg/apis/stable"
"agones.dev/agones/pkg/apis/stable/v1alpha1"
stablev1alpha1 "agones.dev/agones/pkg/apis/stable/v1alpha1"
"agones.dev/agones/pkg/client/clientset/versioned"
getterv1alpha1 "agones.dev/agones/pkg/client/clientset/versioned/typed/stable/v1alpha1"
"agones.dev/agones/pkg/client/informers/externalversions"
listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1"
"agones.dev/agones/pkg/util/crd"
"agones.dev/agones/pkg/util/logfields"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/webhooks"
"agones.dev/agones/pkg/util/workerqueue"
Expand All @@ -48,7 +50,7 @@ import (

// Controller is a the FleetAutoscaler controller
type Controller struct {
logger *logrus.Entry
baseLogger *logrus.Entry
crdGetter v1beta1.CustomResourceDefinitionInterface
fleetGetter getterv1alpha1.FleetsGetter
fleetLister listerv1alpha1.FleetLister
Expand Down Expand Up @@ -79,12 +81,12 @@ func NewController(
fleetAutoscalerLister: agonesInformer.FleetAutoscalers().Lister(),
fleetAutoscalerSynced: fasInformer.HasSynced,
}
c.logger = runtime.NewLoggerWithType(c)
c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleetAutoscaler, c.logger, stable.GroupName+".FleetAutoscalerController")
c.baseLogger = runtime.NewLoggerWithType(c)
c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleetAutoscaler, c.baseLogger, logfields.FleetAutoscalerKey, stable.GroupName+".FleetAutoscalerController")
health.AddLivenessCheck("fleetautoscaler-workerqueue", healthcheck.Check(c.workerqueue.Healthy))

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(c.logger.Infof)
eventBroadcaster.StartLogging(c.baseLogger.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "fleetautoscaler-controller"})

Expand All @@ -105,12 +107,12 @@ func NewController(
// Run the FleetAutoscaler controller. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (c *Controller) Run(workers int, stop <-chan struct{}) error {
err := crd.WaitForEstablishedCRD(c.crdGetter, "fleetautoscalers."+stable.GroupName, c.logger)
err := crd.WaitForEstablishedCRD(c.crdGetter, "fleetautoscalers."+stable.GroupName, c.baseLogger)
if err != nil {
return err
}

c.logger.Info("Wait for cache sync")
c.baseLogger.Info("Wait for cache sync")
if !cache.WaitForCacheSync(stop, c.fleetAutoscalerSynced) {
return errors.New("failed to wait for caches to sync")
}
Expand All @@ -119,14 +121,22 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error {
return nil
}

func (c *Controller) loggerForFleetAutoscalerKey(key string) *logrus.Entry {
return logfields.AugmentLogEntry(c.baseLogger, logfields.FleetAutoscalerKey, key)
}

func (c *Controller) loggerForFleetAutoscaler(fas *v1alpha1.FleetAutoscaler) *logrus.Entry {
return c.loggerForFleetAutoscalerKey(fas.Namespace+"/"+fas.Name).WithField("fas", fas)
}

// validationHandler will intercept when a FleetAutoscaler is created, and
// validate its settings.
func (c *Controller) validationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) {
c.logger.WithField("review", review).Info("validationHandler")
obj := review.Request.Object
fas := &stablev1alpha1.FleetAutoscaler{}
err := json.Unmarshal(obj.Raw, fas)
if err != nil {
c.baseLogger.WithField("review", review).WithError(err).Info("validationHandler")
return review, errors.Wrapf(err, "error unmarshalling original FleetAutoscaler json: %s", obj.Raw)
}

Expand Down Expand Up @@ -154,20 +164,20 @@ func (c *Controller) validationHandler(review admv1beta1.AdmissionReview) (admv1
// syncFleetAutoscaler scales the attached fleet and
// synchronizes the FleetAutoscaler CRD
func (c *Controller) syncFleetAutoscaler(key string) error {
c.logger.WithField("key", key).Info("Synchronising")
c.loggerForFleetAutoscalerKey(key).Info("Synchronising")

// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
// don't return an error, as we don't want this retried
runtime.HandleError(c.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key"))
runtime.HandleError(c.loggerForFleetAutoscalerKey(key), errors.Wrapf(err, "invalid resource key"))
return nil
}

fas, err := c.fleetAutoscalerLister.FleetAutoscalers(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
c.logger.WithField("key", key).Info(fmt.Sprintf("FleetAutoscaler %s from namespace %s is no longer available for syncing", name, namespace))
c.loggerForFleetAutoscalerKey(key).Info(fmt.Sprintf("FleetAutoscaler %s from namespace %s is no longer available for syncing", name, namespace))
return nil
}
return errors.Wrapf(err, "error retrieving FleetAutoscaler %s from namespace %s", name, namespace)
Expand All @@ -177,10 +187,7 @@ func (c *Controller) syncFleetAutoscaler(key string) error {
fleet, err := c.fleetLister.Fleets(namespace).Get(fas.Spec.FleetName)
if err != nil {
if k8serrors.IsNotFound(err) {
logrus.WithError(err).WithField("fleetAutoscaler", fas.Name).
WithField("fleet", fas.Spec.FleetName).
WithField("namespace", namespace).
Warn("Could not find fleet for autoscaler. Skipping.")
c.loggerForFleetAutoscaler(fas).Warn("Could not find fleet for autoscaler. Skipping.")

c.recorder.Eventf(fas, corev1.EventTypeWarning, "FailedGetFleet",
"could not fetch fleet: %s", fas.Spec.FleetName)
Expand Down
50 changes: 29 additions & 21 deletions pkg/fleets/controller.go
Expand Up @@ -26,6 +26,7 @@ import (
"agones.dev/agones/pkg/client/informers/externalversions"
listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1"
"agones.dev/agones/pkg/util/crd"
"agones.dev/agones/pkg/util/logfields"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/webhooks"
"agones.dev/agones/pkg/util/workerqueue"
Expand All @@ -50,7 +51,7 @@ import (

// Controller is a the GameServerSet controller
type Controller struct {
logger *logrus.Entry
baseLogger *logrus.Entry
crdGetter v1beta1.CustomResourceDefinitionInterface
gameServerSetGetter getterv1alpha1.GameServerSetsGetter
gameServerSetLister listerv1alpha1.GameServerSetLister
Expand Down Expand Up @@ -87,12 +88,12 @@ func NewController(
fleetSynced: fInformer.HasSynced,
}

c.logger = runtime.NewLoggerWithType(c)
c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleet, c.logger, stable.GroupName+".FleetController")
c.baseLogger = runtime.NewLoggerWithType(c)
c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleet, c.baseLogger, logfields.FleetKey, stable.GroupName+".FleetController")
health.AddLivenessCheck("fleet-workerqueue", healthcheck.Check(c.workerqueue.Healthy))

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(c.logger.Infof)
eventBroadcaster.StartLogging(c.baseLogger.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "fleet-controller"})

Expand Down Expand Up @@ -126,7 +127,7 @@ func NewController(
// Should only be called on fleet create operations.
// nolint:dupl
func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) {
c.logger.WithField("review", review).Info("creationMutationHandler")
c.baseLogger.WithField("review", review).Info("creationMutationHandler")

obj := review.Request.Object
fleet := &stablev1alpha1.Fleet{}
Expand Down Expand Up @@ -154,7 +155,7 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview)
return review, errors.Wrapf(err, "error creating json for patch for Fleet %s", fleet.ObjectMeta.Name)
}

c.logger.WithField("fleet", fleet.ObjectMeta.Name).WithField("patch", string(jsn)).Infof("patch created!")
c.loggerForFleet(fleet).WithField("patch", string(jsn)).Infof("patch created!")

pt := admv1beta1.PatchTypeJSONPatch
review.Response.PatchType = &pt
Expand All @@ -166,7 +167,7 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview)
// creationValidationHandler that validates a Fleet when it is created
// Should only be called on Fleet create and Update operations.
func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) {
c.logger.WithField("review", review).Info("creationValidationHandler")
c.baseLogger.WithField("review", review).Info("creationValidationHandler")

obj := review.Request.Object
fleet := &stablev1alpha1.Fleet{}
Expand All @@ -191,7 +192,7 @@ func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview
Details: &details,
}

c.logger.WithField("review", review).Info("Invalid Fleet")
c.loggerForFleet(fleet).WithField("review", review).Info("Invalid Fleet")
return review, nil
}

Expand All @@ -201,12 +202,12 @@ func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview
// Run the Fleet controller. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (c *Controller) Run(workers int, stop <-chan struct{}) error {
err := crd.WaitForEstablishedCRD(c.crdGetter, "fleets.stable.agones.dev", c.logger)
err := crd.WaitForEstablishedCRD(c.crdGetter, "fleets.stable.agones.dev", c.baseLogger)
if err != nil {
return err
}

c.logger.Info("Wait for cache sync")
c.baseLogger.Info("Wait for cache sync")
if !cache.WaitForCacheSync(stop, c.gameServerSetSynced, c.fleetSynced) {
return errors.New("failed to wait for caches to sync")
}
Expand All @@ -215,6 +216,14 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error {
return nil
}

func (c *Controller) loggerForFleetKey(key string) *logrus.Entry {
return logfields.AugmentLogEntry(c.baseLogger, logfields.FleetKey, key)
}

func (c *Controller) loggerForFleet(f *v1alpha1.Fleet) *logrus.Entry {
return c.loggerForFleetKey(f.Namespace+"/"+f.Name).WithField("fleet", f)
}

// gameServerSetEventHandler enqueues the owning Fleet for this GameServerSet,
// assuming that it has one
func (c *Controller) gameServerSetEventHandler(obj interface{}) {
Expand All @@ -227,9 +236,9 @@ func (c *Controller) gameServerSetEventHandler(obj interface{}) {
fleet, err := c.fleetLister.Fleets(gsSet.ObjectMeta.Namespace).Get(ref.Name)
if err != nil {
if k8serrors.IsNotFound(err) {
c.logger.WithField("ref", ref).Info("Owner Fleet no longer available for syncing")
c.baseLogger.WithField("ref", ref).Info("Owner Fleet no longer available for syncing")
} else {
runtime.HandleError(c.logger.WithField("fleet", fleet.ObjectMeta.Name).WithField("ref", ref),
runtime.HandleError(c.loggerForFleet(fleet).WithField("ref", ref),
errors.Wrap(err, "error retrieving GameServerSet owner"))
}
return
Expand All @@ -240,20 +249,20 @@ func (c *Controller) gameServerSetEventHandler(obj interface{}) {
// syncFleet synchronised the fleet CRDs and configures/updates
// backing GameServerSets
func (c *Controller) syncFleet(key string) error {
c.logger.WithField("key", key).Info("Synchronising")
c.loggerForFleetKey(key).Info("Synchronising")

// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
// don't return an error, as we don't want this retried
runtime.HandleError(c.logger.WithField("key", key), errors.Wrapf(err, "invalid resource key"))
runtime.HandleError(c.loggerForFleetKey(key), errors.Wrapf(err, "invalid resource key"))
return nil
}

fleet, err := c.fleetLister.Fleets(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
c.logger.WithField("key", key).Info("Fleet is no longer available for syncing")
c.loggerForFleetKey(key).Info("Fleet is no longer available for syncing")
return nil
}
return errors.Wrapf(err, "error retrieving fleet %s from namespace %s", name, namespace)
Expand All @@ -267,7 +276,7 @@ func (c *Controller) syncFleet(key string) error {
active, rest := c.filterGameServerSetByActive(fleet, list)
// if there isn't an active gameServerSet, create one (but don't persist yet)
if active == nil {
c.logger.WithField("fleet", fleet.ObjectMeta.Name).Info("could not find active GameServerSet, creating")
c.loggerForFleet(fleet).Info("could not find active GameServerSet, creating")
active = fleet.GameServerSet()
}

Expand Down Expand Up @@ -358,7 +367,7 @@ func (c *Controller) deleteEmptyGameServerSets(fleet *stablev1alpha1.Fleet, list
func (c *Controller) recreateDeployment(fleet *stablev1alpha1.Fleet, rest []*stablev1alpha1.GameServerSet) (int32, error) {
for _, gsSet := range rest {
if gsSet.Spec.Replicas != 0 {
c.logger.WithField("gameserverset", gsSet.ObjectMeta.Name).Info("applying recreate deployment: scaling to 0")
c.loggerForFleet(fleet).WithField("gameserverset", gsSet.ObjectMeta.Name).Info("applying recreate deployment: scaling to 0")
gsSetCopy := gsSet.DeepCopy()
gsSetCopy.Spec.Replicas = 0
if _, err := c.gameServerSetGetter.GameServerSets(gsSetCopy.ObjectMeta.Namespace).Update(gsSetCopy); err != nil {
Expand Down Expand Up @@ -421,7 +430,7 @@ func (c *Controller) rollingUpdateActive(fleet *stablev1alpha1.Fleet, active *st
replicas = fleet.LowerBoundReplicas(replicas - sumAllocated)
}

c.logger.WithField("gameserverset", active.ObjectMeta.Name).WithField("replicas", replicas).
c.loggerForFleet(fleet).WithField("gameserverset", active.ObjectMeta.Name).WithField("replicas", replicas).
Info("applying rolling update to active gameserverset")

return replicas, nil
Expand Down Expand Up @@ -454,7 +463,7 @@ func (c *Controller) rollingUpdateRest(fleet *stablev1alpha1.Fleet, rest []*stab
gsSetCopy := gsSet.DeepCopy()
gsSetCopy.Spec.Replicas = fleet.LowerBoundReplicas(gsSetCopy.Spec.Replicas - unavailable)

c.logger.WithField("gameserverset", gsSet.ObjectMeta.Name).WithField("replicas", gsSetCopy.Spec.Replicas).
c.loggerForFleet(fleet).WithField("gameserverset", gsSet.ObjectMeta.Name).WithField("replicas", gsSetCopy.Spec.Replicas).
Info("applying rolling update to inactive gameserverset")

if _, err := c.gameServerSetGetter.GameServerSets(gsSetCopy.ObjectMeta.Namespace).Update(gsSetCopy); err != nil {
Expand All @@ -473,8 +482,7 @@ func (c *Controller) rollingUpdateRest(fleet *stablev1alpha1.Fleet, rest []*stab
// updateFleetStatus gets the GameServerSets for this Fleet and then
// calculates the counts for the status, and updates the Fleet
func (c *Controller) updateFleetStatus(fleet *stablev1alpha1.Fleet) error {

c.logger.WithField("key", fleet.Name).Info("Update Fleet Status")
c.loggerForFleet(fleet).Info("Update Fleet Status")

list, err := ListGameServerSetsByFleetOwner(c.gameServerSetLister, fleet)
if err != nil {
Expand Down

0 comments on commit 0cff425

Please sign in to comment.