From 6af5094de09d5f09a032c8ace075b16c049d4e41 Mon Sep 17 00:00:00 2001 From: Volodymyr Zahorniak Date: Mon, 8 Jun 2026 10:44:55 +0300 Subject: [PATCH 1/2] refactor(plugin-circleci): extract unfinished-jobs input clauses into a helper Signed-off-by: Volodymyr Zahorniak --- .../plugins/circleci/tasks/job_collector.go | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/backend/plugins/circleci/tasks/job_collector.go b/backend/plugins/circleci/tasks/job_collector.go index fd1d78286bf..92b815fbe88 100644 --- a/backend/plugins/circleci/tasks/job_collector.go +++ b/backend/plugins/circleci/tasks/job_collector.go @@ -41,6 +41,20 @@ var CollectJobsMeta = plugin.SubTaskMeta{ DomainTypes: []string{plugin.DOMAIN_TYPE_CICD}, } +// UnfinishedJobsInputClauses returns the DAL clauses that select the workflows whose +// jobs are still in a non-terminal status and therefore need their job details +// recollected by the CollectJobs "unfinished details" collector. +func UnfinishedJobsInputClauses(connectionId uint64, projectSlug string) []dal.Clause { + return []dal.Clause{ + dal.Select("DISTINCT workflow_id"), // Only need to recollect jobs for a workflow once + dal.From(&models.CircleciJob{}), + dal.Where( + "connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')", + connectionId, projectSlug, + ), + } +} + func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_JOB_TABLE) logger := taskCtx.GetLogger() @@ -94,14 +108,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { AfterResponse: ignoreDeletedBuilds, }, BuildInputIterator: func() (api.Iterator, errors.Error) { - clauses := []dal.Clause{ - dal.Select("DISTINCT workflow_id"), // Only need to recollect jobs for a workflow once - dal.From(&models.CircleciJob{}), - dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')", data.Options.ConnectionId, data.Options.ProjectSlug), - } - db := taskCtx.GetDal() - cursor, err := db.Cursor(clauses...) + cursor, err := db.Cursor(UnfinishedJobsInputClauses(data.Options.ConnectionId, data.Options.ProjectSlug)...) if err != nil { return nil, err } From bca774d374767f4791256522b971768b4d82894f Mon Sep 17 00:00:00 2001 From: Volodymyr Zahorniak Date: Mon, 8 Jun 2026 10:46:10 +0300 Subject: [PATCH 2/2] fix(plugin-circleci): populate workflow id for unfinished-job collection (#8907) The collectJobs 'unfinished details' collector built its URL from '/v2/workflow/{{ .Input.Id }}/job' but its iterator selected 'DISTINCT workflow_id' into a models.CircleciJob, leaving .Id empty and producing '/v2/workflow//job' (HTTP 500) whenever a job was running/queued/on_hold. Alias the projection to 'workflow_id AS id' so .Id carries the workflow id, mirroring the new-records collector. Adds an e2e regression test. Signed-off-by: Volodymyr Zahorniak --- .../circleci/e2e/job_collector_test.go | 82 +++++++++++++++++++ .../plugins/circleci/tasks/job_collector.go | 2 +- 2 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 backend/plugins/circleci/e2e/job_collector_test.go diff --git a/backend/plugins/circleci/e2e/job_collector_test.go b/backend/plugins/circleci/e2e/job_collector_test.go new file mode 100644 index 00000000000..ed2f014217f --- /dev/null +++ b/backend/plugins/circleci/e2e/job_collector_test.go @@ -0,0 +1,82 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "reflect" + "sort" + "testing" + + "github.com/apache/incubator-devlake/helpers/e2ehelper" + "github.com/apache/incubator-devlake/helpers/pluginhelper/api" + "github.com/apache/incubator-devlake/plugins/circleci/impl" + "github.com/apache/incubator-devlake/plugins/circleci/models" + "github.com/apache/incubator-devlake/plugins/circleci/tasks" + "github.com/stretchr/testify/assert" +) + +// TestCircleciUnfinishedJobsInputIterator is a regression test for +// https://github.com/apache/devlake/issues/8907. The "collect unfinished job +// details" collector builds its URL from "/v2/workflow/{{ .Input.Id }}/job" while +// scanning rows into a models.CircleciJob. Its input query must therefore expose the +// workflow id in the row's Id field; a bare "DISTINCT workflow_id" left Id empty and +// produced "/v2/workflow//job" (HTTP 500). This test runs the production query +// (tasks.UnfinishedJobsInputClauses) through the real iterator and asserts each +// yielded row's Id is the workflow id, that results are DISTINCT, and that the +// status/connection filters hold. +func TestCircleciUnfinishedJobsInputIterator(t *testing.T) { + var circleci impl.Circleci + dataflowTester := e2ehelper.NewDataFlowTester(t, "circleci", circleci) + + const projectSlug = "github/test/repo" + dataflowTester.FlushTabler(&models.CircleciJob{}) + + seed := []models.CircleciJob{ + {ConnectionId: 1, WorkflowId: "wf-onhold", Id: "job-1", ProjectSlug: projectSlug, Status: "on_hold"}, + {ConnectionId: 1, WorkflowId: "wf-onhold", Id: "job-2", ProjectSlug: projectSlug, Status: "running"}, // same workflow -> DISTINCT + {ConnectionId: 1, WorkflowId: "wf-queued", Id: "job-3", ProjectSlug: projectSlug, Status: "queued"}, + {ConnectionId: 1, WorkflowId: "wf-success", Id: "job-4", ProjectSlug: projectSlug, Status: "success"}, // terminal -> excluded + {ConnectionId: 2, WorkflowId: "wf-otherconn", Id: "job-5", ProjectSlug: projectSlug, Status: "on_hold"}, // other connection -> excluded + } + for i := range seed { + assert.Nil(t, dataflowTester.Dal.Create(&seed[i])) + } + + cursor, err := dataflowTester.Dal.Cursor(tasks.UnfinishedJobsInputClauses(1, projectSlug)...) + assert.Nil(t, err) + iter, err := api.NewDalCursorIterator(dataflowTester.Dal, cursor, reflect.TypeOf(models.CircleciJob{})) + assert.Nil(t, err) + defer iter.Close() + + var ids []string + for iter.HasNext() { + item, err := iter.Fetch() + assert.Nil(t, err) + job := item.(*models.CircleciJob) + ids = append(ids, job.Id) + } + sort.Strings(ids) + + // Distinct workflow ids for connection 1's non-terminal jobs, with Id populated + // (the URL template reads .Input.Id). wf-success (terminal) and wf-otherconn + // (connection 2) are excluded. + assert.Equal(t, []string{"wf-onhold", "wf-queued"}, ids) + for _, id := range ids { + assert.NotEmpty(t, id, "Input.Id must be the workflow id, not empty (#8907)") + } +} diff --git a/backend/plugins/circleci/tasks/job_collector.go b/backend/plugins/circleci/tasks/job_collector.go index 92b815fbe88..00fd234524f 100644 --- a/backend/plugins/circleci/tasks/job_collector.go +++ b/backend/plugins/circleci/tasks/job_collector.go @@ -46,7 +46,7 @@ var CollectJobsMeta = plugin.SubTaskMeta{ // recollected by the CollectJobs "unfinished details" collector. func UnfinishedJobsInputClauses(connectionId uint64, projectSlug string) []dal.Clause { return []dal.Clause{ - dal.Select("DISTINCT workflow_id"), // Only need to recollect jobs for a workflow once + dal.Select("DISTINCT workflow_id AS id"), // #8907: alias to id so {{ .Input.Id }} resolves when scanned into CircleciJob dal.From(&models.CircleciJob{}), dal.Where( "connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')",