From ed328e4160e92db2db0669d04fa8c36690b0911a Mon Sep 17 00:00:00 2001 From: Klesh Wong Date: Wed, 10 Sep 2025 16:24:42 +0800 Subject: [PATCH 1/2] fix: high priority pipelines could be starved by `parallel/` label --- backend/server/services/pipeline.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index 78e7df085c3..d41556479a2 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -261,8 +261,17 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, })) // prepare query to find an appropriate pipeline to execute pipeline = &models.Pipeline{} + // 1. find out the current highest priority in the queue + top_priority := 0 + where_status := dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}) + err = tx.First(&top_priority, dal.Select("priority"), dal.From(pipeline), where_status, dal.Orderby("priority DESC")) + if err != nil { + tx.Rollback() + } + // 2. pick the earlier runnable pipeline with the highest priority err = tx.First(pipeline, - dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}), + where_status, + dal.Where("priority = ?", top_priority), dal.Join( `left join _devlake_pipeline_labels ON _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND @@ -270,10 +279,10 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, _devlake_pipeline_labels.name in ?`, runningParallelLabels, ), - dal.Groupby("priority, id"), + dal.Groupby("id"), dal.Having("count(_devlake_pipeline_labels.name)=0"), dal.Select("id"), - dal.Orderby("priority DESC, id ASC"), + dal.Orderby("id ASC"), dal.Limit(1), ) if err == nil { From 545cc722ed7b1802d5718e0e9db3a33ca495a835 Mon Sep 17 00:00:00 2001 From: Klesh Wong Date: Wed, 10 Sep 2025 16:59:58 +0800 Subject: [PATCH 2/2] fix: failed on mysql --- backend/server/services/pipeline.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index d41556479a2..f6c770f9c6f 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -263,10 +263,14 @@ func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, pipeline = &models.Pipeline{} // 1. find out the current highest priority in the queue top_priority := 0 + var top_priorities []int where_status := dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}) - err = tx.First(&top_priority, dal.Select("priority"), dal.From(pipeline), where_status, dal.Orderby("priority DESC")) + err = tx.Pluck("priority", &top_priorities, dal.From(pipeline), where_status, dal.Orderby("priority DESC"), dal.Limit(1)) if err != nil { - tx.Rollback() + panic(err) + } + if len(top_priorities) > 0 { + top_priority = top_priorities[0] } // 2. pick the earlier runnable pipeline with the highest priority err = tx.First(pipeline,