Skip to content

Commit

Permalink
fix: Concurrent map read and map write in agent. Fixes #9685 (#9689)
Browse files Browse the repository at this point in the history
Signed-off-by: yangxue.chen <chenyangxuehdu@126.com>

Signed-off-by: yangxue.chen <chenyangxuehdu@126.com>
Co-authored-by: yangxue.chen <chenyangxuehdu@126.com>
  • Loading branch information
chenyangxueHDU and yangxue.chen committed Oct 8, 2022
1 parent 1bbdf0d commit a8e37e9
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
12 changes: 6 additions & 6 deletions workflow/executor/agent.go
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -40,7 +41,7 @@ type AgentExecutor struct {
WorkflowInterface workflow.Interface
RESTClient rest.Interface
Namespace string
consideredTasks map[string]bool
consideredTasks *sync.Map
plugins []executorplugins.TemplateExecutor
}

Expand All @@ -54,7 +55,7 @@ func NewAgentExecutor(clientSet kubernetes.Interface, restClient rest.Interface,
Namespace: namespace,
WorkflowName: workflowName,
WorkflowInterface: workflow.NewForConfigOrDie(config),
consideredTasks: make(map[string]bool),
consideredTasks: &sync.Map{},
plugins: plugins,
}
}
Expand Down Expand Up @@ -126,13 +127,11 @@ func (ae *AgentExecutor) taskWorker(ctx context.Context, taskQueue chan task, re

// Do not work on tasks that have already been considered once, to prevent calling an endpoint more
// than once unintentionally.
if _, ok := ae.consideredTasks[nodeID]; ok {
if _, ok := ae.consideredTasks.LoadOrStore(nodeID, true); ok {
log.Info("Task is already considered")
continue
}

ae.consideredTasks[nodeID] = true

log.Info("Processing task")
result, requeue, err := ae.processTask(ctx, tmpl)
if err != nil {
Expand All @@ -155,7 +154,8 @@ func (ae *AgentExecutor) taskWorker(ctx context.Context, taskQueue chan task, re
}
if requeue > 0 {
time.AfterFunc(requeue, func() {
delete(ae.consideredTasks, nodeID)
ae.consideredTasks.Delete(nodeID)

taskQueue <- task
})
}
Expand Down
3 changes: 2 additions & 1 deletion workflow/executor/agent_test.go
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -11,7 +12,7 @@ import (

func TestUnsupportedTemplateTaskWorker(t *testing.T) {
ae := &AgentExecutor{
consideredTasks: map[string]bool{},
consideredTasks: &sync.Map{},
}
taskQueue := make(chan task)
defer close(taskQueue)
Expand Down

0 comments on commit a8e37e9

Please sign in to comment.