-
Notifications
You must be signed in to change notification settings - Fork 493
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Gitextractor supports incremental collection (#7319)
* refactor: extract CollectorStateManager from ApiCollector * feat: gitext support diffsync * fix: collector params for gitextractor * docs: update comments * fix: add more cases to TestCollectorStateManager * fix: typo
- Loading branch information
Showing
12 changed files
with
365 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
119 changes: 119 additions & 0 deletions
119
backend/helpers/pluginhelper/api/collector_state_manager.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/* | ||
Licensed to the Apache Software Foundation (ASF) under one or more | ||
contributor license agreements. See the NOTICE file distributed with | ||
this work for additional information regarding copyright ownership. | ||
The ASF licenses this file to You under the Apache License, Version 2.0 | ||
(the "License"); you may not use this file except in compliance with | ||
the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package api | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/apache/incubator-devlake/core/context" | ||
"github.com/apache/incubator-devlake/core/dal" | ||
"github.com/apache/incubator-devlake/core/errors" | ||
"github.com/apache/incubator-devlake/core/models" | ||
) | ||
|
||
// CollectorStateManager manages the state of the collector. It is used to determine whether | ||
// the collector should run in incremental mode or full sync mode and what time range to collect. | ||
type CollectorStateManager struct { | ||
db dal.Dal | ||
state *models.CollectorLatestState | ||
syncPolicy *models.SyncPolicy | ||
// IsIncremental indicates whether the collector should run in incremental mode or full sync mode | ||
isIncremental bool | ||
// Since is the start time of the time range to collect | ||
since *time.Time | ||
// Until is the end time of the time range to collect | ||
until *time.Time | ||
} | ||
|
||
// NewCollectorStateManager create a new CollectorStateManager | ||
func NewCollectorStateManager(basicRes context.BasicRes, syncPolicy *models.SyncPolicy, rawTable, rawParams string) (stateManager *CollectorStateManager, err errors.Error) { | ||
// load sync policy and make sure it is not nil | ||
if syncPolicy == nil { | ||
syncPolicy = &models.SyncPolicy{} | ||
} | ||
|
||
// load the previous state from the database | ||
db := basicRes.GetDal() | ||
state := &models.CollectorLatestState{} | ||
err = db.First(state, dal.Where(`raw_data_table = ? AND raw_data_params = ?`, rawTable, rawParams)) | ||
if err != nil { | ||
if db.IsErrorNotFound(err) { | ||
state = &models.CollectorLatestState{ | ||
RawDataTable: rawTable, | ||
RawDataParams: rawParams, | ||
} | ||
err = nil | ||
} else { | ||
err = errors.Default.Wrap(err, "failed to load the previous collector state") | ||
return | ||
} | ||
} | ||
|
||
// fullsync by default | ||
now := time.Now() | ||
stateManager = &CollectorStateManager{ | ||
db: db, | ||
state: state, | ||
syncPolicy: syncPolicy, | ||
isIncremental: false, | ||
since: syncPolicy.TimeAfter, | ||
until: &now, | ||
} | ||
// fallback to the previous timeAfter if no new value | ||
if stateManager.since == nil { | ||
stateManager.since = state.TimeAfter | ||
} | ||
|
||
// if fullsync is set or no previous success start time, we are in the full sync mode | ||
if syncPolicy.FullSync || state.LatestSuccessStart == nil { | ||
return | ||
} | ||
|
||
// if timeAfter is not set or NOT before the previous vaule, we are in the incremental mode | ||
if syncPolicy.TimeAfter == nil || state.TimeAfter == nil || !syncPolicy.TimeAfter.Before(*state.TimeAfter) { | ||
stateManager.isIncremental = true | ||
stateManager.since = state.LatestSuccessStart | ||
} | ||
|
||
return | ||
} | ||
|
||
func (c *CollectorStateManager) IsIncremental() bool { | ||
return c.isIncremental | ||
} | ||
|
||
func (c *CollectorStateManager) GetSince() *time.Time { | ||
return c.since | ||
} | ||
|
||
func (c *CollectorStateManager) GetUntil() *time.Time { | ||
return c.until | ||
} | ||
|
||
func (c *CollectorStateManager) Close() errors.Error { | ||
// update timeAfter in the database only for fullsync mode | ||
if !c.isIncremental { | ||
// prefer non-nil value | ||
if c.syncPolicy.TimeAfter != nil { | ||
c.state.TimeAfter = c.syncPolicy.TimeAfter | ||
} | ||
} | ||
// always update the latest success start time | ||
c.state.LatestSuccessStart = c.until | ||
return c.db.Update(c.state) | ||
} |
160 changes: 160 additions & 0 deletions
160
backend/helpers/pluginhelper/api/collector_state_manager_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
Licensed to the Apache Software Foundation (ASF) under one or more | ||
contributor license agreements. See the NOTICE file distributed with | ||
this work for additional information regarding copyright ownership. | ||
The ASF licenses this file to You under the Apache License, Version 2.0 | ||
(the "License"); you may not use this file except in compliance with | ||
the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package api | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/apache/incubator-devlake/core/errors" | ||
"github.com/apache/incubator-devlake/core/models" | ||
"github.com/apache/incubator-devlake/helpers/unithelper" | ||
mockcontext "github.com/apache/incubator-devlake/mocks/core/context" | ||
mockdal "github.com/apache/incubator-devlake/mocks/core/dal" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/mock" | ||
) | ||
|
||
func TestCollectorStateManager(t *testing.T) { | ||
time0 := errors.Must1(time.Parse(time.RFC3339, "2020-01-01T00:00:00Z")) | ||
time1 := errors.Must1(time.Parse(time.RFC3339, "2021-01-01T00:00:00Z")) | ||
time2 := errors.Must1(time.Parse(time.RFC3339, "2022-01-01T00:00:00Z")) | ||
for _, tc := range []struct { | ||
name string | ||
state *models.CollectorLatestState | ||
syncPolicy *models.SyncPolicy | ||
expectedIsIncremental bool | ||
expectedSince *time.Time | ||
expectedNewStateTimeAfter *time.Time | ||
}{ | ||
{ | ||
name: "syncPolicy has no timeAfter - First run", | ||
state: &models.CollectorLatestState{LatestSuccessStart: nil}, | ||
syncPolicy: &models.SyncPolicy{TimeAfter: nil}, | ||
expectedIsIncremental: false, | ||
expectedSince: nil, | ||
expectedNewStateTimeAfter: nil, | ||
}, | ||
{ | ||
name: "syncPolicy has no timeAfter - Second run", | ||
state: &models.CollectorLatestState{LatestSuccessStart: &time1}, | ||
syncPolicy: &models.SyncPolicy{TimeAfter: nil}, | ||
expectedIsIncremental: true, | ||
expectedSince: &time1, | ||
expectedNewStateTimeAfter: nil, | ||
}, | ||
{ | ||
name: "syncPolicy has no timeAfter - Third run with timeAfter specified", | ||
state: &models.CollectorLatestState{LatestSuccessStart: &time1}, | ||
syncPolicy: &models.SyncPolicy{TimeAfter: &time1}, | ||
expectedIsIncremental: true, | ||
expectedSince: &time1, | ||
expectedNewStateTimeAfter: nil, | ||
}, | ||
{ | ||
name: "syncPolicy has timeAfter - First run", | ||
state: &models.CollectorLatestState{LatestSuccessStart: nil}, | ||
syncPolicy: &models.SyncPolicy{TimeAfter: &time1}, | ||
expectedIsIncremental: false, | ||
expectedSince: &time1, | ||
expectedNewStateTimeAfter: &time1, | ||
}, | ||
{ | ||
name: "syncPolicy has timeAfter - Second run with a later timeAfter", | ||
state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time2}, | ||
syncPolicy: &models.SyncPolicy{TimeAfter: &time2}, | ||
expectedIsIncremental: true, | ||
expectedSince: &time2, | ||
expectedNewStateTimeAfter: &time1, | ||
}, | ||
{ | ||
name: "syncPolicy has timeAfter - Third run with a earlier timeAfter", | ||
state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1}, | ||
syncPolicy: &models.SyncPolicy{TimeAfter: &time0}, | ||
expectedIsIncremental: false, | ||
expectedSince: &time0, | ||
expectedNewStateTimeAfter: &time0, | ||
}, | ||
{ | ||
name: "syncPolicy has timeAfter - Fourth run with a same timeAfter", | ||
state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time2}, | ||
syncPolicy: &models.SyncPolicy{TimeAfter: &time1}, | ||
expectedIsIncremental: true, | ||
expectedSince: &time2, | ||
expectedNewStateTimeAfter: &time1, | ||
}, | ||
{ | ||
name: "Full sync - with timeAfter", | ||
state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1}, | ||
syncPolicy: &models.SyncPolicy{FullSync: true}, | ||
expectedIsIncremental: false, | ||
expectedSince: &time1, | ||
expectedNewStateTimeAfter: &time1, | ||
}, | ||
{ | ||
name: "Full sync - with newer timeAfter", | ||
state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1}, | ||
syncPolicy: &models.SyncPolicy{TimeAfter: &time2, FullSync: true}, | ||
expectedIsIncremental: false, | ||
expectedSince: &time2, | ||
expectedNewStateTimeAfter: &time2, | ||
}, | ||
{ | ||
name: "Full sync - with older timeAfter", | ||
state: &models.CollectorLatestState{TimeAfter: &time1, LatestSuccessStart: &time1}, | ||
syncPolicy: &models.SyncPolicy{TimeAfter: &time0, FullSync: true}, | ||
expectedIsIncremental: false, | ||
expectedSince: &time0, | ||
expectedNewStateTimeAfter: &time0, | ||
}, | ||
{ | ||
name: "Full sync - without timeAfter", | ||
state: &models.CollectorLatestState{TimeAfter: nil, LatestSuccessStart: &time1}, | ||
syncPolicy: &models.SyncPolicy{FullSync: true}, | ||
expectedIsIncremental: false, | ||
expectedSince: nil, | ||
expectedNewStateTimeAfter: nil, | ||
}, | ||
} { | ||
started := time.Now() | ||
t.Run(tc.name, func(t *testing.T) { | ||
mockBasicRes := newMockBasicRes(tc.state) | ||
stateManager, err := NewCollectorStateManager(mockBasicRes, tc.syncPolicy, "table", "params") | ||
assert.Nil(t, err) | ||
assert.Equal(t, tc.expectedSince, stateManager.since) | ||
assert.Equal(t, tc.expectedIsIncremental, stateManager.isIncremental) | ||
assert.Nil(t, stateManager.Close()) | ||
assert.Equal(t, tc.expectedNewStateTimeAfter, stateManager.state.TimeAfter) | ||
// LatestSuccessStart should be updated | ||
assert.GreaterOrEqual(t, stateManager.state.LatestSuccessStart.Unix(), started.Unix()) | ||
// First and update should both be called once | ||
mockBasicRes.AssertExpectations(t) | ||
}) | ||
} | ||
} | ||
|
||
func newMockBasicRes(state *models.CollectorLatestState) *mockcontext.BasicRes { | ||
// Refresh Global Variables and set the sql mock | ||
return unithelper.DummyBasicRes(func(mockDal *mockdal.Dal) { | ||
mockDal.On("First", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { | ||
dst := args.Get(0).(*models.CollectorLatestState) | ||
*dst = *state | ||
}).Return(nil).Once() | ||
mockDal.On("Update", mock.Anything, mock.Anything).Return(nil).Once() | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.