From 37035984aed08d1c30c90c59a8aeb9d6d2449e31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Thu, 8 Jun 2023 15:54:02 +0200 Subject: [PATCH] Fix known concurrent map access issues --- server/automation/service/workflow.go | 32 +++++++++++++++++---------- server/pkg/apigw/profiler/profiler.go | 15 +++++++++++-- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/server/automation/service/workflow.go b/server/automation/service/workflow.go index 9909855b38..2f76e73e7c 100644 --- a/server/automation/service/workflow.go +++ b/server/automation/service/workflow.go @@ -37,16 +37,17 @@ type ( log *zap.Logger // cache of workflows, graphs to workflow ID (key, uint64) - cache map[uint64]*wfCacheItem + cache map[uint64]*wfCacheItem + muxCache *sync.RWMutex // handle to workflow index - wIndex map[string]uint64 + wIndex map[string]uint64 + muxWIndex *sync.RWMutex // workflow function registry reg *registry corredorOpt options.CorredorOpt - mux *sync.RWMutex parser expr.Parsable } @@ -108,7 +109,8 @@ func Workflow(log *zap.Logger, corredorOpt options.CorredorOpt, opt options.Work eventbus: eventbus.Service(), cache: make(map[uint64]*wfCacheItem), wIndex: make(map[string]uint64), - mux: &sync.RWMutex{}, + muxCache: &sync.RWMutex{}, + muxWIndex: &sync.RWMutex{}, parser: expr.NewParser(), reg: Registry(), corredorOpt: corredorOpt, @@ -376,6 +378,9 @@ func (svc *workflow) updater(ctx context.Context, workflowID uint64, action func } func (svc *workflow) handleToID(h string) uint64 { + svc.muxWIndex.RLock() + defer svc.muxWIndex.RUnlock() + return svc.wIndex[h] } @@ -510,11 +515,13 @@ func (svc *workflow) Load(ctx context.Context) error { g *wfexec.Graph runAs intAuth.Identifiable ) - if err != nil { return err } + svc.muxWIndex.Lock() + defer svc.muxWIndex.Unlock() + for _, wf := range set { svc.wIndex[wf.Handle] = wf.ID @@ -530,8 +537,9 @@ func (svc *workflow) Load(ctx context.Context) error { // updateCache func (svc *workflow) updateCache(wf *types.Workflow, runAs intAuth.Identifiable, g *wfexec.Graph) { - defer svc.mux.Unlock() - svc.mux.Lock() + defer svc.muxCache.Unlock() + svc.muxCache.Lock() + if wf.Executable() { svc.cache[wf.ID] = &wfCacheItem{g: g, wf: wf, runAs: runAs} } else { @@ -553,14 +561,14 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl ) err := func() (err error) { - svc.mux.Lock() + svc.muxCache.Lock() if nil == svc.cache[workflowID] || nil == svc.cache[workflowID].wf { - svc.mux.Unlock() + svc.muxCache.Unlock() return WorkflowErrNotFound() } wf := svc.cache[workflowID].wf - svc.mux.Unlock() + svc.muxCache.Unlock() wap.setWorkflow(wf) @@ -702,8 +710,8 @@ func (svc *workflow) exec(ctx context.Context, wf *types.Workflow, p types.Workf return nil, 0, wf.Issues } - defer svc.mux.Unlock() - svc.mux.Lock() + defer svc.muxCache.Unlock() + svc.muxCache.Lock() if svc.cache[wf.ID] == nil { return nil, 0, WorkflowErrInvalidID() diff --git a/server/pkg/apigw/profiler/profiler.go b/server/pkg/apigw/profiler/profiler.go index 2acd577d95..77b20f0d25 100644 --- a/server/pkg/apigw/profiler/profiler.go +++ b/server/pkg/apigw/profiler/profiler.go @@ -2,6 +2,7 @@ package profiler import ( "net/http" + "sync" "time" h "github.com/cortezaproject/corteza/server/pkg/http" @@ -9,12 +10,13 @@ import ( type ( Profiler struct { - l Hits + mux sync.RWMutex + l Hits } ) func New() *Profiler { - return &Profiler{make(Hits)} + return &Profiler{l: make(Hits)} } func (p *Profiler) Hit(r *h.Request) (h *Hit) { @@ -29,6 +31,9 @@ func (p *Profiler) Hit(r *h.Request) (h *Hit) { } func (p *Profiler) Push(h *Hit) (id string) { + p.mux.Lock() + defer p.mux.Unlock() + if h.Tf == nil { n := time.Now() d := n.Sub(*h.Ts) @@ -46,6 +51,9 @@ func (p *Profiler) Push(h *Hit) (id string) { } func (p *Profiler) Hits(s Sort) Hits { + p.mux.RLock() + defer p.mux.RUnlock() + ll := p.l.Filter(func(k string, v *Hit) bool { var b bool = true @@ -64,6 +72,9 @@ func (p *Profiler) Hits(s Sort) Hits { } func (p *Profiler) Purge(f *PurgeFilter) { + p.mux.Lock() + defer p.mux.Unlock() + if f.RouteID == 0 { p.l = make(Hits, 0) return