Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions models/migrationscripts/20220711_add_subtasks_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package migrationscripts

import (
"context"
commonArchived "github.com/apache/incubator-devlake/models/migrationscripts/archived"
"gorm.io/gorm"
"time"
)

type addSubtasksTable struct {
}

// Subtask20220711 DB snapshot model of models.Subtask
type Subtask20220711 struct {
commonArchived.Model
TaskID uint64 `json:"task_id" gorm:"index"`
SubtaskName string `json:"name" gorm:"column:name;index"`
Number int `json:"number"`
BeganAt *time.Time `json:"beganAt"`
FinishedAt *time.Time `json:"finishedAt" gorm:"index"`
SpentSeconds int64 `json:"spentSeconds"`
}

func (s Subtask20220711) TableName() string {
return "_devlake_subtasks"
}

func (u addSubtasksTable) Up(_ context.Context, db *gorm.DB) error {
err := db.Migrator().AutoMigrate(&Subtask20220711{})
return err
}

func (u addSubtasksTable) Version() uint64 {
return 20220711000001
}

func (u addSubtasksTable) Name() string {
return "create subtask schema"
}
1 change: 1 addition & 0 deletions models/migrationscripts/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ func All() []migration.Script {
new(removeNotes),
new(addProjectMapping),
new(addNoPKModelToCommitParent),
new(addSubtasksTable),
}
}
14 changes: 14 additions & 0 deletions models/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ type NewTask struct {
PipelineCol int `json:"-"`
}

type Subtask struct {
common.Model
TaskID uint64 `json:"task_id" gorm:"index"`
Name string `json:"name" gorm:"index"`
Number int `json:"number"`
BeganAt *time.Time `json:"beganAt"`
FinishedAt *time.Time `json:"finishedAt" gorm:"index"`
SpentSeconds int64 `json:"spentSeconds"`
}

func (Task) TableName() string {
return "_devlake_tasks"
}

func (Subtask) TableName() string {
return "_devlake_subtasks"
}
1 change: 0 additions & 1 deletion plugins/core/plugin_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package core

import (
"context"

"github.com/apache/incubator-devlake/plugins/core/dal"
"gorm.io/gorm"
)
Expand Down
2 changes: 2 additions & 0 deletions runner/directrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask core.PluginTask, op
panic(err)
}
ctx := createContext()
var parentTaskID uint64 = 0
err = RunPluginSubTasks(
ctx,
cfg,
log,
db,
parentTaskID,
cmd.Use,
tasks,
options,
Expand Down
50 changes: 42 additions & 8 deletions runner/run_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// RunTask FIXME ...
func RunTask(
ctx context.Context,
cfg *viper.Viper,
_ *viper.Viper,
logger core.Logger,
db *gorm.DB,
progress chan core.RunningProgress,
Expand Down Expand Up @@ -112,6 +112,7 @@ func RunTask(
config.GetConfig(),
logger.Nested(task.Plugin),
db,
task.ID,
task.Plugin,
subtasks,
options,
Expand All @@ -126,6 +127,7 @@ func RunPluginTask(
cfg *viper.Viper,
logger core.Logger,
db *gorm.DB,
taskID uint64,
name string,
subtasks []string,
options map[string]interface{},
Expand All @@ -145,6 +147,7 @@ func RunPluginTask(
cfg,
logger,
db,
taskID,
name,
subtasks,
options,
Expand All @@ -159,8 +162,9 @@ func RunPluginSubTasks(
cfg *viper.Viper,
logger core.Logger,
db *gorm.DB,
taskID uint64,
name string,
subtasks []string,
subtaskNames []string,
options map[string]interface{},
pluginTask core.PluginTask,
progress chan core.RunningProgress,
Expand All @@ -181,10 +185,10 @@ func RunPluginSubTasks(
*/

// user specifies what subtasks to run
if len(subtasks) != 0 {
if len(subtaskNames) != 0 {
// decode user specified subtasks
var specifiedTasks []string
err := mapstructure.Decode(subtasks, &specifiedTasks)
err := mapstructure.Decode(subtaskNames, &specifiedTasks)
if err != nil {
return err
}
Expand Down Expand Up @@ -231,7 +235,7 @@ func RunPluginSubTasks(

// execute subtasks in order
taskCtx.SetProgress(0, steps)
i := 0
subtaskNumber := 0
for _, subtaskMeta := range subtaskMetas {
subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
if err != nil {
Expand All @@ -245,15 +249,15 @@ func RunPluginSubTasks(

// run subtask
logger.Info("executing subtask %s", subtaskMeta.Name)
i++
subtaskNumber++
if progress != nil {
progress <- core.RunningProgress{
Type: core.SetCurrentSubTask,
SubTaskName: subtaskMeta.Name,
SubTaskNumber: i,
SubTaskNumber: subtaskNumber,
}
}
err = subtaskMeta.EntryPoint(subtaskCtx)
err = runSubtask(logger, db, taskID, subtaskNumber, subtaskCtx, subtaskMeta.EntryPoint)
if err != nil {
return &errors.SubTaskError{
SubTaskName: subtaskMeta.Name,
Expand Down Expand Up @@ -292,3 +296,33 @@ func UpdateProgressDetail(db *gorm.DB, logger core.Logger, taskId uint64, progre
progressDetail.SubTaskNumber = p.SubTaskNumber
}
}

func runSubtask(
logger core.Logger,
db *gorm.DB,
parentID uint64,
subtaskNumber int,
ctx core.SubTaskContext,
entryPoint core.SubTaskEntryPoint,
) error {
beginAt := time.Now()
subtask := &models.Subtask{
Name: ctx.GetName(),
TaskID: parentID,
Number: subtaskNumber,
BeganAt: &beginAt,
}
defer func() {
finishedAt := time.Now()
subtask.FinishedAt = &finishedAt
subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix()
Comment thread
klesh marked this conversation as resolved.
recordSubtask(logger, db, subtask)
}()
return entryPoint(ctx)
}

func recordSubtask(logger core.Logger, db *gorm.DB, subtask *models.Subtask) {
if err := db.Create(&subtask).Error; err != nil {
logger.Error("error writing subtask %d status to DB: %v", subtask.ID, err)
}
}