diff --git a/backend/plugins/circleci/tasks/job_collector.go b/backend/plugins/circleci/tasks/job_collector.go index 06241a547fa..60fae1b9bc3 100644 --- a/backend/plugins/circleci/tasks/job_collector.go +++ b/backend/plugins/circleci/tasks/job_collector.go @@ -68,6 +68,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { GetNextPageCustomData: ExtractNextPageToken, Query: BuildQueryParamsWithPageToken, ResponseParser: ParseCircleciPageTokenResp, + AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a job has been deleted }) if err != nil { logger.Error(err, "collect jobs error") diff --git a/backend/plugins/circleci/tasks/pipeline_collector.go b/backend/plugins/circleci/tasks/pipeline_collector.go index b7940e6c8bf..20055f89405 100644 --- a/backend/plugins/circleci/tasks/pipeline_collector.go +++ b/backend/plugins/circleci/tasks/pipeline_collector.go @@ -18,6 +18,10 @@ limitations under the License. package tasks import ( + "encoding/json" + "net/http" + "time" + "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/helpers/pluginhelper/api" @@ -38,6 +42,7 @@ var CollectPipelinesMeta = plugin.SubTaskMeta{ func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error { rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_TABLE) logger := taskCtx.GetLogger() + timeAfter := rawDataSubTaskArgs.Ctx.TaskContext().SyncPolicy().TimeAfter logger.Info("collect pipelines") collector, err := api.NewApiCollector(api.ApiCollectorArgs{ RawDataSubTaskArgs: *rawDataSubTaskArgs, @@ -46,7 +51,29 @@ func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error { PageSize: int(data.Options.PageSize), GetNextPageCustomData: ExtractNextPageToken, Query: BuildQueryParamsWithPageToken, - ResponseParser: ParseCircleciPageTokenResp, + ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { + data := CircleciPageTokenResp[[]json.RawMessage]{} + err := api.UnmarshalResponse(res, &data) + + if err != nil { + return nil, err + } + filteredItems := []json.RawMessage{} + for _, item := range data.Items { + var pipeline struct { + CreatedAt time.Time `json:"created_at"` + } + if err := json.Unmarshal(item, &pipeline); err != nil { + return nil, errors.Default.Wrap(err, "failed to unmarshal pipeline item") + } + if pipeline.CreatedAt.Before(*timeAfter) { + return filteredItems, api.ErrFinishCollect + } + filteredItems = append(filteredItems, item) + + } + return filteredItems, nil + }, }) if err != nil { logger.Error(err, "collect pipelines error") diff --git a/backend/plugins/circleci/tasks/shared.go b/backend/plugins/circleci/tasks/shared.go index 998b1419c74..89b9321a75a 100644 --- a/backend/plugins/circleci/tasks/shared.go +++ b/backend/plugins/circleci/tasks/shared.go @@ -121,3 +121,12 @@ func ParseCircleciPageTokenResp(res *http.Response) ([]json.RawMessage, errors.E err := api.UnmarshalResponse(res, &data) return data.Items, err } + +func ignoreDeletedBuilds(res *http.Response) errors.Error { + // CircleCI API will return a 404 response for a workflow/job that has been deleted + // due to their data retention policy. We should ignore these errors. + if res.StatusCode == http.StatusNotFound { + return api.ErrIgnoreAndContinue + } + return nil +} diff --git a/backend/plugins/circleci/tasks/workflow_collector.go b/backend/plugins/circleci/tasks/workflow_collector.go index cdb4e1c36cc..342efc82378 100644 --- a/backend/plugins/circleci/tasks/workflow_collector.go +++ b/backend/plugins/circleci/tasks/workflow_collector.go @@ -68,6 +68,7 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error { GetNextPageCustomData: ExtractNextPageToken, Query: BuildQueryParamsWithPageToken, ResponseParser: ParseCircleciPageTokenResp, + AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted }) if err != nil { logger.Error(err, "collect workflows error")