Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/plugins/circleci/tasks/job_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a job has been deleted
})
if err != nil {
logger.Error(err, "collect jobs error")
Expand Down
29 changes: 28 additions & 1 deletion backend/plugins/circleci/tasks/pipeline_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ limitations under the License.
package tasks

import (
"encoding/json"
"net/http"
"time"

"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
Expand All @@ -38,6 +42,7 @@ var CollectPipelinesMeta = plugin.SubTaskMeta{
func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_TABLE)
logger := taskCtx.GetLogger()
timeAfter := rawDataSubTaskArgs.Ctx.TaskContext().SyncPolicy().TimeAfter
logger.Info("collect pipelines")
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
Expand All @@ -46,7 +51,29 @@ func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
data := CircleciPageTokenResp[[]json.RawMessage]{}
err := api.UnmarshalResponse(res, &data)

if err != nil {
return nil, err
}
filteredItems := []json.RawMessage{}
for _, item := range data.Items {
var pipeline struct {
CreatedAt time.Time `json:"created_at"`
}
if err := json.Unmarshal(item, &pipeline); err != nil {
return nil, errors.Default.Wrap(err, "failed to unmarshal pipeline item")
}
if pipeline.CreatedAt.Before(*timeAfter) {
return filteredItems, api.ErrFinishCollect
}
filteredItems = append(filteredItems, item)

}
return filteredItems, nil
},
})
if err != nil {
logger.Error(err, "collect pipelines error")
Expand Down
9 changes: 9 additions & 0 deletions backend/plugins/circleci/tasks/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,12 @@ func ParseCircleciPageTokenResp(res *http.Response) ([]json.RawMessage, errors.E
err := api.UnmarshalResponse(res, &data)
return data.Items, err
}

func ignoreDeletedBuilds(res *http.Response) errors.Error {
// CircleCI API will return a 404 response for a workflow/job that has been deleted
// due to their data retention policy. We should ignore these errors.
if res.StatusCode == http.StatusNotFound {
return api.ErrIgnoreAndContinue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be api.ErrFinishCollect ? There would be no more data to be collected if the reason was "data retention", would it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With api.ErrFinishCollect it potentially misses workflow/jobs that are still available if the ci data isn't processed exactly in order from newest -> oldest. With api.ErrIgnoreAndContinue if that does occur those rows would still be collected.

E.g. say there are 600 pipelines to be collected, 1 has been deleted. These end up out of order and the deleted record gets processed as 300/600. The devlake pipeline would end with 300 rows uncollected.

There shouldn't be a significant number of deleted workflows & jobs for attempted collection (as the associated pipelines will also have been deleted), so the increase in devlake pipeline duration should be minimal & worth the reduced risk of missing rows.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, I see. It makes total sense.
Sorry for taking so long I was on vacation and somehow missed the notification.

}
return nil
}
1 change: 1 addition & 0 deletions backend/plugins/circleci/tasks/workflow_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error {
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
})
if err != nil {
logger.Error(err, "collect workflows error")
Expand Down