Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,172 +21,74 @@ import (
"encoding/json"
"net/http"
"net/url"
"reflect"
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
)

// ApiCollectorStateManager save collector state in framework table
type ApiCollectorStateManager struct {
// StatefulApiCollector runs multiple collectors as a single subtask and maintains the state of the collector
// mainly the time range to collect across multiple collections. It is useful when you need to support timeAfter
// and diff sync for APIs that do not support filtering by the updated date.
type StatefulApiCollector struct {
RawDataSubTaskArgs
CollectorStateManager
// *ApiCollector
// *GraphqlCollector
subtasks []plugin.SubTask
newState models.CollectorLatestState
IsIncremental bool
Since *time.Time
Before *time.Time
nestedCollectors []plugin.SubTask
}

type CollectorOptions struct {
TimeAfter string `json:"timeAfter,omitempty" mapstructure:"timeAfter,omitempty"`
}

// NewStatefulApiCollector create a new ApiCollectorStateManager
func NewStatefulApiCollector(args RawDataSubTaskArgs) (*ApiCollectorStateManager, errors.Error) {
db := args.Ctx.GetDal()
// NewStatefulApiCollector create a new StatefulApiCollector
func NewStatefulApiCollector(args RawDataSubTaskArgs) (*StatefulApiCollector, errors.Error) {
syncPolicy := args.Ctx.TaskContext().SyncPolicy()
rawDataSubTask, err := NewRawDataSubTask(args)
if err != nil {
return nil, errors.Default.Wrap(err, "Couldn't resolve raw subtask args")
}

// get optionTimeAfter from options
data := args.Ctx.GetData()
value := reflect.ValueOf(data)
if value.Kind() == reflect.Ptr && value.Elem().Kind() == reflect.Struct {
options := value.Elem().FieldByName("Options")
if options.IsValid() && options.Kind() == reflect.Ptr && options.Elem().Kind() == reflect.Struct {
collectorOptions := options.Elem().FieldByName("CollectorOptions")
if collectorOptions.IsValid() && collectorOptions.Kind() == reflect.Struct {
timeAfter := collectorOptions.FieldByName("TimeAfter")
if timeAfter.IsValid() && timeAfter.Kind() == reflect.String && timeAfter.String() != "" {
optionTimeAfter, parseErr := time.Parse(time.RFC3339, timeAfter.String())
if parseErr != nil {
return nil, errors.Default.Wrap(parseErr, "Failed to parse timeAfter!")
}
if syncPolicy != nil {
syncPolicy.TimeAfter = &optionTimeAfter
} else {
syncPolicy = &models.SyncPolicy{
TimeAfter: &optionTimeAfter,
}
}
}
}
}
return nil, err
}

// CollectorLatestState retrieves the latest collector state from the database
oldState := models.CollectorLatestState{}
err = db.First(&oldState, dal.Where(`raw_data_table = ? AND raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
stateManager, err := NewCollectorStateManager(args.Ctx, syncPolicy, rawDataSubTask.table, rawDataSubTask.params)
if err != nil {
if db.IsErrorNotFound(err) {
oldState = models.CollectorLatestState{
RawDataTable: rawDataSubTask.table,
RawDataParams: rawDataSubTask.params,
}
} else {
return nil, errors.Default.Wrap(err, "failed to load JiraLatestCollectorMeta")
}
}
// Extract timeAfter and latestSuccessStart from old state
oldTimeAfter := oldState.TimeAfter
oldLatestSuccessStart := oldState.LatestSuccessStart

// Calculate incremental and since based on syncPolicy and old state
var isIncremental bool
var since *time.Time

if oldLatestSuccessStart == nil {
// 1. If no oldState.LatestSuccessStart, not incremental and since is syncPolicy.TimeAfter
isIncremental = false
if syncPolicy != nil {
since = syncPolicy.TimeAfter
}
} else if syncPolicy == nil {
// 2. If no syncPolicy, incremental and since is oldState.LatestSuccessStart
isIncremental = true
since = oldLatestSuccessStart
} else if syncPolicy.FullSync {
// 3. If fullSync true, not incremental and since is syncPolicy.TimeAfter
isIncremental = false
since = syncPolicy.TimeAfter
} else if syncPolicy.TimeAfter == nil {
// 4. If no syncPolicy TimeAfter, incremental and since is oldState.LatestSuccessStart
isIncremental = true
since = oldLatestSuccessStart
} else {
// 5. If syncPolicy.TimeAfter not nil
if oldTimeAfter != nil && syncPolicy.TimeAfter.Before(*oldTimeAfter) {
// 4.1 If oldTimeAfter not nil and syncPolicy.TimeAfter before oldTimeAfter, incremental is false and since is syncPolicy.TimeAfter
isIncremental = false
since = syncPolicy.TimeAfter
} else {
// 4.2 If oldTimeAfter nil or syncPolicy.TimeAfter after oldTimeAfter, incremental is true and since is oldState.LatestSuccessStart
isIncremental = true
since = oldLatestSuccessStart
}
}

currentTime := time.Now()
oldState.LatestSuccessStart = &currentTime
if syncPolicy != nil {
oldState.TimeAfter = syncPolicy.TimeAfter
if syncPolicy.TimeAfter != nil && oldTimeAfter != nil && (oldTimeAfter).Before(*syncPolicy.TimeAfter) && !syncPolicy.FullSync {
oldState.TimeAfter = oldTimeAfter
}
return nil, err
}

return &ApiCollectorStateManager{
RawDataSubTaskArgs: args,
newState: oldState,
IsIncremental: isIncremental,
Since: since,
Before: &currentTime,
return &StatefulApiCollector{
RawDataSubTaskArgs: args,
CollectorStateManager: *stateManager,
}, nil

}

// InitCollector init the embedded collector
func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) errors.Error {
// InitCollector appends a new collector to the list
func (m *StatefulApiCollector) InitCollector(args ApiCollectorArgs) errors.Error {
args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
args.Incremental = args.Incremental || m.IsIncremental
args.Incremental = m.CollectorStateManager.IsIncremental()
apiCollector, err := NewApiCollector(args)
if err != nil {
return err
}
m.subtasks = append(m.subtasks, apiCollector)
m.nestedCollectors = append(m.nestedCollectors, apiCollector)
return nil
}

// InitGraphQLCollector init the embedded collector
func (m *ApiCollectorStateManager) InitGraphQLCollector(args GraphqlCollectorArgs) errors.Error {
// InitGraphQLCollector appends a new GraphQL collector to the list
func (m *StatefulApiCollector) InitGraphQLCollector(args GraphqlCollectorArgs) errors.Error {
args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
args.Incremental = args.Incremental || m.IsIncremental
args.Incremental = m.CollectorStateManager.IsIncremental()
graphqlCollector, err := NewGraphqlCollector(args)
if err != nil {
return err
}
m.subtasks = append(m.subtasks, graphqlCollector)
m.nestedCollectors = append(m.nestedCollectors, graphqlCollector)
return nil
}

// Execute the embedded collector and record execute state
func (m *ApiCollectorStateManager) Execute() errors.Error {
for _, subtask := range m.subtasks {
// Execute all nested collectors and save the state if all collectors succeed
func (m *StatefulApiCollector) Execute() errors.Error {
for _, subtask := range m.nestedCollectors {
err := subtask.Execute()
if err != nil {
return err
}
}

db := m.Ctx.GetDal()
return db.CreateOrUpdate(&m.newState)
return m.CollectorStateManager.Close()
}

// NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync support for
Expand Down Expand Up @@ -222,8 +124,8 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg
return nil, err
}

createdAfter := manager.Since
isIncremental := manager.IsIncremental
createdAfter := manager.CollectorStateManager.GetSince()
isIncremental := manager.CollectorStateManager.IsIncremental()

// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
Expand Down Expand Up @@ -329,7 +231,6 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg
type FinalizableApiCollectorArgs struct {
RawDataSubTaskArgs
ApiClient RateLimitedApiClient
TimeAfter *time.Time // leave it be nil to disable time filter
CollectNewRecordsByList FinalizableApiCollectorListArgs
CollectUnfinishedDetails *FinalizableApiCollectorDetailArgs
}
Expand Down
15 changes: 8 additions & 7 deletions backend/plugins/azuredevops_go/tasks/pr_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package tasks

import (
"fmt"
"net/url"
"time"

"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"net/url"
"time"
)

func init() {
Expand Down Expand Up @@ -51,12 +52,12 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
Options: data.Options,
}

collectorWithState, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}

err = collectorWithState.InitCollector(api.ApiCollectorArgs{
err = apiCollector.InitCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
PageSize: 100,
Expand All @@ -67,9 +68,9 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
query.Set("$skip", fmt.Sprint(reqData.Pager.Skip))
query.Set("$top", fmt.Sprint(reqData.Pager.Size))

if collectorWithState.Since != nil {
if apiCollector.GetSince() != nil {
query.Set("searchCriteria.queryTimeRangeType", "created")
query.Set("searchCriteria.minTime", collectorWithState.Since.Format(time.RFC3339))
query.Set("searchCriteria.minTime", apiCollector.GetSince().Format(time.RFC3339))
}
return query, nil
},
Expand All @@ -81,5 +82,5 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
return err
}

return collectorWithState.Execute()
return apiCollector.Execute()
}
24 changes: 12 additions & 12 deletions backend/plugins/bitbucket/tasks/api_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func GetQuery(reqData *api.RequestData) (url.Values, errors.Error) {
}

// GetQueryCreatedAndUpdated is a common GeyQuery for timeFilter and incremental
func GetQueryCreatedAndUpdated(fields string, collectorWithState *api.ApiCollectorStateManager) func(reqData *api.RequestData) (url.Values, errors.Error) {
func GetQueryCreatedAndUpdated(fields string, apiCollector *api.StatefulApiCollector) func(reqData *api.RequestData) (url.Values, errors.Error) {
return func(reqData *api.RequestData) (url.Values, errors.Error) {
query, err := GetQuery(reqData)
if err != nil {
Expand All @@ -102,8 +102,8 @@ func GetQueryCreatedAndUpdated(fields string, collectorWithState *api.ApiCollect
query.Set("fields", fields)
query.Set("sort", "created_on")

if collectorWithState.Since != nil {
query.Set("q", fmt.Sprintf("updated_on>=%s", collectorWithState.Since.Format(time.RFC3339)))
if apiCollector.GetSince() != nil {
query.Set("q", fmt.Sprintf("updated_on>=%s", apiCollector.GetSince().Format(time.RFC3339)))
}
return query, nil
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func GetRawMessageFromResponse(res *http.Response) ([]json.RawMessage, errors.Er
return rawMessages.Values, nil
}

func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, sac *api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*BitbucketTaskData)
clauses := []dal.Clause{
Expand All @@ -175,8 +175,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState *
data.Options.FullName, data.Options.ConnectionId,
),
}
if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", *collectorWithState.Since))
if sac.IsIncremental() && sac.GetSince() != nil {
clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", *sac.GetSince()))
}

// construct the input iterator
Expand All @@ -188,7 +188,7 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState *
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(BitbucketInput{}))
}

func GetIssuesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
func GetIssuesIterator(taskCtx plugin.SubTaskContext, sac *api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*BitbucketTaskData)
clauses := []dal.Clause{
Expand All @@ -199,8 +199,8 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.Ap
data.Options.FullName, data.Options.ConnectionId,
),
}
if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", *collectorWithState.Since))
if sac.IsIncremental() && sac.GetSince() != nil {
clauses = append(clauses, dal.Where("bitbucket_updated_at > ?", *sac.GetSince()))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
Expand All @@ -211,7 +211,7 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.Ap
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(BitbucketInput{}))
}

func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
func GetPipelinesIterator(taskCtx plugin.SubTaskContext, sac *api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*BitbucketTaskData)
clauses := []dal.Clause{
Expand All @@ -222,8 +222,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api
data.Options.FullName, data.Options.ConnectionId,
),
}
if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("bitbucket_complete_on > ?", *collectorWithState.Since))
if sac.IsIncremental() && sac.GetSince() != nil {
clauses = append(clauses, dal.Where("bitbucket_complete_on > ?", *sac.GetSince()))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
Expand Down
6 changes: 3 additions & 3 deletions backend/plugins/bitbucket_server/tasks/api_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func GetRawMessageFromResponse(res *http.Response) ([]json.RawMessage, errors.Er
return rawMessages.Values, nil
}

func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState *helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) {
func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*BitbucketServerTaskData)
clauses := []dal.Clause{
Expand All @@ -131,8 +131,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState *
),
}

if collectorWithState.IsIncremental && collectorWithState.Since != nil {
clauses = append(clauses, dal.Where("bpr.bitbucket_server_updated_at > ?", *collectorWithState.Since))
if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
clauses = append(clauses, dal.Where("bpr.bitbucket_server_updated_at > ?", *apiCollector.GetSince()))
}

// construct the input iterator
Expand Down
Loading