Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions backend/server/services/blueprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func MakePlanForBlueprint(blueprint *models.Blueprint, syncPolicy *models.SyncPo
if err != nil {
return nil, err
}
return SequencializePipelinePlans(blueprint.BeforePlan, plan, blueprint.AfterPlan), nil
return SequentializePipelinePlans(blueprint.BeforePlan, plan, blueprint.AfterPlan), nil
}

// ParallelizePipelinePlans merges multiple pipelines into one unified plan
Expand All @@ -388,9 +388,9 @@ func ParallelizePipelinePlans(plans ...models.PipelinePlan) models.PipelinePlan
return merged
}

// SequencializePipelinePlans merges multiple pipelines into one unified plan
// by assuming they must be executed in sequencial order
func SequencializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePlan {
// SequentializePipelinePlans merges multiple pipelines into one unified plan
// by assuming they must be executed in sequential order
func SequentializePipelinePlans(plans ...models.PipelinePlan) models.PipelinePlan {
merged := make(models.PipelinePlan, 0)
// iterate all pipelineTasks and try to merge them into `merged`
for _, plan := range plans {
Expand Down
2 changes: 1 addition & 1 deletion backend/server/services/blueprint_makeplan_v200.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func GeneratePlanJsonV200(
}
}
}
plan := SequencializePipelinePlans(
plan := SequentializePipelinePlans(
planForProjectMapping,
ParallelizePipelinePlans(sourcePlans...),
ParallelizePipelinePlans(metricPlans...),
Expand Down
7 changes: 6 additions & 1 deletion backend/server/services/pipeline_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package services

import (
"fmt"
"sync"
"time"

"github.com/apache/incubator-devlake/core/dal"
Expand All @@ -27,8 +28,12 @@ import (
"github.com/apache/incubator-devlake/helpers/dbhelper"
)

var createDbPipelineLock sync.Mutex

// CreateDbPipeline returns a NewPipeline
func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipeline, err errors.Error) {
createDbPipelineLock.Lock()
defer createDbPipelineLock.Unlock()
pipeline = &models.Pipeline{}
txHelper := dbhelper.NewTxHelper(basicRes, &err)
defer txHelper.End()
Expand All @@ -49,7 +54,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipelin
dal.From(&models.Pipeline{}),
dal.Where("blueprint_id = ? AND status IN ?", newPipeline.BlueprintId, models.PendingTaskStatus),
))
// some pipeline is ruunning , get the detail and output them.
// some pipeline is running, get the detail and output them.
if count > 0 {
return nil, errors.BadInput.New("there are pending pipelines of current blueprint already")
}
Expand Down