Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: subtask state manager #7384

Merged
merged 2 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package migrationscripts

import (
"time"

"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
)

var _ plugin.MigrationScript = (*addSubtaskStates)(nil)

type subtaskState20240424 struct {
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
Plugin string `gorm:"primaryKey;type:varchar(50)" json:"plugin"`
Subtask string `gorm:"primaryKey;type:varchar(50)" json:"subtask"`
RawDataTable string `gorm:"primaryKey;column:raw_data_table;type:varchar(50)" json:"raw_data_table"`
RawDataParams string `gorm:"primaryKey;column:raw_data_params;type:varchar(255);index" json:"raw_data_params"`
TimeAfter *time.Time
LatestSuccessStart *time.Time
}

func (subtaskState20240424) TableName() string {
return "_devlake_subtask_states"
}

type addSubtaskStates struct{}

func (script *addSubtaskStates) Up(basicRes context.BasicRes) errors.Error {
db := basicRes.GetDal()
errors.Must(db.AutoMigrate(&subtaskState20240424{}))
return nil
}

func (*addSubtaskStates) Version() uint64 {
return 20240424152734
}

func (*addSubtaskStates) Name() string {
return "add _devlake_subtask_states"
}
1 change: 1 addition & 0 deletions backend/core/models/migrationscripts/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,6 @@ func All() []plugin.MigrationScript {
new(addStore),
new(addSubtaskField),
new(addDisplayTitleAndUrl),
new(addSubtaskStates),
}
}
40 changes: 40 additions & 0 deletions backend/core/models/subtask_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package models

import (
"time"
)

type SubtaskState struct {
Plugin string `gorm:"primaryKey;type:varchar(255)" json:"plugin"`
Subtask string `gorm:"primaryKey;type:varchar(255)" json:"subtask"`
// Params is a json string to identitfy rows of a specific scope (jira board, github repo)
Params string `gorm:"primaryKey;type:varchar(255);index" json:"params"`
// PrevConfig stores the previous configuration of the subtask for determining should subtask run in Incremntal or FullSync mode
PrevConfig string `json:"prevConfig"`
// TimeAfter stores the previous timeAfter specified by the user for determining should subtask run in Incremntal or FullSync mode
TimeAfter *time.Time `json:"timeAfter"`
PrevStartedAt *time.Time `json:"prevStartedAt"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}

func (SubtaskState) TableName() string {
return "_devlake_subtask_states"
}
9 changes: 9 additions & 0 deletions backend/core/utils/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package utils

import (
"encoding/json"
"fmt"
"reflect"

Expand Down Expand Up @@ -72,3 +73,11 @@ func Convert[T any](value any) (T, errors.Error) {
return result, nil
}
}

func ToJsonString(x any) string {
b, err := json.Marshal(x)
if err != nil {
panic(err)
}
return string(b)
}
128 changes: 128 additions & 0 deletions backend/helpers/pluginhelper/api/subtask_state_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
plugin "github.com/apache/incubator-devlake/core/plugin"
)

// SubtaskCommonArgs is a struct that contains the common arguments for a subtask
type SubtaskCommonArgs struct {
plugin.SubTaskContext
Params string // for filtering rows belonging to the scope (jira board, github repo) of the subtask
SubtaskConfig string // for determining whether the subtask should run in incremental or full sync mode
BatchSize int // batch size for saving data
}

// SubtaskStateManager manages the state of a subtask. It is used to determine whether
// the subtask should run in incremental mode or full sync mode and what time range to collect.
type SubtaskStateManager struct {
db dal.Dal
state *models.SubtaskState
syncPolicy *models.SyncPolicy
isIncremental bool // tells if the subtask should run in incremental mode or full sync mode
since *time.Time // the start time of the time range to work on
until *time.Time // the end time of the time range to work on
config string // current configuration of the subtask for determining if the subtask should run in incremental or full sync mode
}

// NewSubtaskStateManager create a new SubtaskStateManager
func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskStateManager, err errors.Error) {
db := args.GetDal()
syncPolicy := args.SubTaskContext.TaskContext().SyncPolicy()
plugin := args.SubTaskContext.TaskContext().GetName()
subtask := args.SubTaskContext.GetName()
// load sync policy and make sure it is not nil
if syncPolicy == nil {
syncPolicy = &models.SyncPolicy{}
}
// load the previous state from the database
state := &models.SubtaskState{}
err = db.First(state, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, args.Params))
if err != nil {
if db.IsErrorNotFound(err) {
state = &models.SubtaskState{
Plugin: plugin,
Subtask: subtask,
Params: args.Params,
}
err = nil
} else {
err = errors.Default.Wrap(err, "failed to load the previous subtask state")
return
}
}
// fullsync by default
now := time.Now()
stateManager = &SubtaskStateManager{
db: db,
state: state,
syncPolicy: syncPolicy,
isIncremental: false,
since: syncPolicy.TimeAfter,
until: &now,
config: args.SubtaskConfig,
}
// fallback to the previous timeAfter if no new value
if stateManager.since == nil {
stateManager.since = state.TimeAfter
}
// if fullsync is set or no previous success start time, we are in the full sync mode
if syncPolicy.FullSync || state.PrevStartedAt == nil {
return
}
// if timeAfter is not set or NOT before the previous vaule, we are in the incremental mode
if (syncPolicy.TimeAfter == nil || state.TimeAfter == nil || !syncPolicy.TimeAfter.Before(*state.TimeAfter)) &&
// and the previous config is the same as the current config
(state.PrevConfig == stateManager.config) {
stateManager.isIncremental = true
stateManager.since = state.PrevStartedAt
}
return
}

func (c *SubtaskStateManager) IsIncremental() bool {
return c.isIncremental
}

func (c *SubtaskStateManager) GetSince() *time.Time {
return c.since
}

func (c *SubtaskStateManager) GetUntil() *time.Time {
return c.until
}

func (c *SubtaskStateManager) Close() errors.Error {
// update timeAfter in the database only for fullsync mode
if !c.isIncremental {
// prefer non-nil value
if c.syncPolicy.TimeAfter != nil {
c.state.TimeAfter = c.syncPolicy.TimeAfter
}
}
// always update the latest success start time
c.state.PrevStartedAt = c.until
c.state.PrevConfig = c.config
return c.db.Update(c.state)
}
Loading
Loading