From 486a5752b2c4b1f5ed56849cea5dcfef2581132c Mon Sep 17 00:00:00 2001 From: Denis Arh Date: Wed, 8 Dec 2021 09:53:28 +0100 Subject: [PATCH] Tune workflow session flushing frequency --- automation/service/session.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/automation/service/session.go b/automation/service/session.go index 151b71f350..d833222f8b 100644 --- a/automation/service/session.go +++ b/automation/service/session.go @@ -50,6 +50,16 @@ type ( WaitFn func(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, types.Stacktrace, error) ) +const ( + // when the state changes, state-change-handler is called and for non-fatal, + // non-interactive or non-delay steps (that are much more frequent) we need + // to limit how often the store is updated with the updated session info + // + // We use the size of the stacktrace and for every F (see the value of the constant) + // we flush the session info to the store. + sessionStateFlushFrequency = 1000 +) + func Session(log *zap.Logger, opt options.WorkflowOpt, ps promptSender) *session { return &session{ log: log, @@ -360,14 +370,9 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa return } - const ( - // how often do we flush to store - flushFrequency = 10 - ) - var ( // By default, we want to update session when new status is prompted, delayed, completed or failed - // But if status is active, we'll flush it every X frames (flushFrequency) + // But if status is active, we'll flush it every X frames (sessionStateFlushFrequency) update = true frame = state.MakeFrame() @@ -411,8 +416,8 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa ses.Status = types.SessionFailed default: - // force update on every 10 new frames but only when stacktrace is not nil - update = ses.RuntimeStacktrace != nil && len(ses.RuntimeStacktrace)%flushFrequency == 0 + // force update on every F new frames (F=sessionStateFlushFrequency) but only when stacktrace is not nil + update = ses.RuntimeStacktrace != nil && len(ses.RuntimeStacktrace)%sessionStateFlushFrequency == 0 } if !update { @@ -423,8 +428,6 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa if err := svc.store.UpsertAutomationSession(ctx, ses); err != nil { log.Error("failed to update session", zap.Error(err)) - } else { - log.Debug("session updated", zap.Stringer("status", ses.Status)) } } }