Skip to content

Commit

Permalink
Fix known concurrent map access issues
Browse files Browse the repository at this point in the history
  • Loading branch information
tjerman committed Jun 8, 2023
1 parent c93cd59 commit 3703598
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
32 changes: 20 additions & 12 deletions server/automation/service/workflow.go
Expand Up @@ -37,16 +37,17 @@ type (
log *zap.Logger

// cache of workflows, graphs to workflow ID (key, uint64)
cache map[uint64]*wfCacheItem
cache map[uint64]*wfCacheItem
muxCache *sync.RWMutex

// handle to workflow index
wIndex map[string]uint64
wIndex map[string]uint64
muxWIndex *sync.RWMutex

// workflow function registry
reg *registry
corredorOpt options.CorredorOpt

mux *sync.RWMutex
parser expr.Parsable
}

Expand Down Expand Up @@ -108,7 +109,8 @@ func Workflow(log *zap.Logger, corredorOpt options.CorredorOpt, opt options.Work
eventbus: eventbus.Service(),
cache: make(map[uint64]*wfCacheItem),
wIndex: make(map[string]uint64),
mux: &sync.RWMutex{},
muxCache: &sync.RWMutex{},
muxWIndex: &sync.RWMutex{},
parser: expr.NewParser(),
reg: Registry(),
corredorOpt: corredorOpt,
Expand Down Expand Up @@ -376,6 +378,9 @@ func (svc *workflow) updater(ctx context.Context, workflowID uint64, action func
}

func (svc *workflow) handleToID(h string) uint64 {
svc.muxWIndex.RLock()
defer svc.muxWIndex.RUnlock()

return svc.wIndex[h]
}

Expand Down Expand Up @@ -510,11 +515,13 @@ func (svc *workflow) Load(ctx context.Context) error {
g *wfexec.Graph
runAs intAuth.Identifiable
)

if err != nil {
return err
}

svc.muxWIndex.Lock()
defer svc.muxWIndex.Unlock()

for _, wf := range set {
svc.wIndex[wf.Handle] = wf.ID

Expand All @@ -530,8 +537,9 @@ func (svc *workflow) Load(ctx context.Context) error {

// updateCache
func (svc *workflow) updateCache(wf *types.Workflow, runAs intAuth.Identifiable, g *wfexec.Graph) {
defer svc.mux.Unlock()
svc.mux.Lock()
defer svc.muxCache.Unlock()
svc.muxCache.Lock()

if wf.Executable() {
svc.cache[wf.ID] = &wfCacheItem{g: g, wf: wf, runAs: runAs}
} else {
Expand All @@ -553,14 +561,14 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl
)

err := func() (err error) {
svc.mux.Lock()
svc.muxCache.Lock()
if nil == svc.cache[workflowID] || nil == svc.cache[workflowID].wf {
svc.mux.Unlock()
svc.muxCache.Unlock()
return WorkflowErrNotFound()
}

wf := svc.cache[workflowID].wf
svc.mux.Unlock()
svc.muxCache.Unlock()

wap.setWorkflow(wf)

Expand Down Expand Up @@ -702,8 +710,8 @@ func (svc *workflow) exec(ctx context.Context, wf *types.Workflow, p types.Workf
return nil, 0, wf.Issues
}

defer svc.mux.Unlock()
svc.mux.Lock()
defer svc.muxCache.Unlock()
svc.muxCache.Lock()

if svc.cache[wf.ID] == nil {
return nil, 0, WorkflowErrInvalidID()
Expand Down
15 changes: 13 additions & 2 deletions server/pkg/apigw/profiler/profiler.go
Expand Up @@ -2,19 +2,21 @@ package profiler

import (
"net/http"
"sync"
"time"

h "github.com/cortezaproject/corteza/server/pkg/http"
)

type (
Profiler struct {
l Hits
mux sync.RWMutex
l Hits
}
)

func New() *Profiler {
return &Profiler{make(Hits)}
return &Profiler{l: make(Hits)}
}

func (p *Profiler) Hit(r *h.Request) (h *Hit) {
Expand All @@ -29,6 +31,9 @@ func (p *Profiler) Hit(r *h.Request) (h *Hit) {
}

func (p *Profiler) Push(h *Hit) (id string) {
p.mux.Lock()
defer p.mux.Unlock()

if h.Tf == nil {
n := time.Now()
d := n.Sub(*h.Ts)
Expand All @@ -46,6 +51,9 @@ func (p *Profiler) Push(h *Hit) (id string) {
}

func (p *Profiler) Hits(s Sort) Hits {
p.mux.RLock()
defer p.mux.RUnlock()

ll := p.l.Filter(func(k string, v *Hit) bool {
var b bool = true

Expand All @@ -64,6 +72,9 @@ func (p *Profiler) Hits(s Sort) Hits {
}

func (p *Profiler) Purge(f *PurgeFilter) {
p.mux.Lock()
defer p.mux.Unlock()

if f.RouteID == 0 {
p.l = make(Hits, 0)
return
Expand Down

0 comments on commit 3703598

Please sign in to comment.