From bc029f594ae3537a8b4d5dc0f0a477ded4d24c41 Mon Sep 17 00:00:00 2001 From: d4x1 <1507509064@qq.com> Date: Fri, 12 Jul 2024 18:22:37 +0800 Subject: [PATCH] fix(framework): add lock when creating pipeline --- backend/server/services/blueprint.go | 8 ++++---- backend/server/services/blueprint_makeplan_v200.go | 2 +- backend/server/services/pipeline_helper.go | 7 ++++++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/backend/server/services/blueprint.go b/backend/server/services/blueprint.go index 28da2215922..cf7d20ccffb 100644 --- a/backend/server/services/blueprint.go +++ b/backend/server/services/blueprint.go @@ -367,7 +367,7 @@ func MakePlanForBlueprint(blueprint *models.Blueprint, syncPolicy *models.SyncPo if err != nil { return nil, err } - return SequencializePipelinePlans(blueprint.BeforePlan, plan, blueprint.AfterPlan), nil + return SequentializePipelinePlans(blueprint.BeforePlan, plan, blueprint.AfterPlan), nil } // ParallelizePipelinePlans merges multiple pipelines into one unified plan @@ -388,9 +388,9 @@ func ParallelizePipelinePlans(plans ...models.PipelinePlan) models.PipelinePlan return merged } -// SequencializePipelinePlans merges multiple pipelines into one unified plan -// by assuming they must be executed in sequencial order -func SequencializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePlan { +// SequentializePipelinePlans merges multiple pipelines into one unified plan +// by assuming they must be executed in sequential order +func SequentializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePlan { merged := make(models.PipelinePlan, 0) // iterate all pipelineTasks and try to merge them into `merged` for _, plan := range plans { diff --git a/backend/server/services/blueprint_makeplan_v200.go b/backend/server/services/blueprint_makeplan_v200.go index 045061e5971..bd89d7de3e2 100644 --- a/backend/server/services/blueprint_makeplan_v200.go +++ b/backend/server/services/blueprint_makeplan_v200.go @@ -132,7 +132,7 @@ func GeneratePlanJsonV200( } } } - plan := SequencializePipelinePlans( + plan := SequentializePipelinePlans( planForProjectMapping, ParallelizePipelinePlans(sourcePlans...), ParallelizePipelinePlans(metricPlans...), diff --git a/backend/server/services/pipeline_helper.go b/backend/server/services/pipeline_helper.go index eec6a8c3498..899110f5cae 100644 --- a/backend/server/services/pipeline_helper.go +++ b/backend/server/services/pipeline_helper.go @@ -19,6 +19,7 @@ package services import ( "fmt" + "sync" "time" "github.com/apache/incubator-devlake/core/dal" @@ -27,8 +28,12 @@ import ( "github.com/apache/incubator-devlake/helpers/dbhelper" ) +var createDbPipelineLock sync.Mutex + // CreateDbPipeline returns a NewPipeline func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipeline, err errors.Error) { + createDbPipelineLock.Lock() + defer createDbPipelineLock.Unlock() pipeline = &models.Pipeline{} txHelper := dbhelper.NewTxHelper(basicRes, &err) defer txHelper.End() @@ -49,7 +54,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipelin dal.From(&models.Pipeline{}), dal.Where("blueprint_id = ? AND status IN ?", newPipeline.BlueprintId, models.PendingTaskStatus), )) - // some pipeline is ruunning , get the detail and output them. + // some pipeline is running, get the detail and output them. if count > 0 { return nil, errors.BadInput.New("there are pending pipelines of current blueprint already") }