diff --git a/models/migrationscripts/20220711_add_subtasks_table.go b/models/migrationscripts/20220711_add_subtasks_table.go new file mode 100644 index 00000000000..3798289f883 --- /dev/null +++ b/models/migrationscripts/20220711_add_subtasks_table.go @@ -0,0 +1,39 @@ +package migrationscripts + +import ( + "context" + commonArchived "github.com/apache/incubator-devlake/models/migrationscripts/archived" + "gorm.io/gorm" + "time" +) + +type addSubtasksTable struct { +} + +// Subtask20220711 DB snapshot model of models.Subtask +type Subtask20220711 struct { + commonArchived.Model + TaskID uint64 `json:"task_id" gorm:"index"` + SubtaskName string `json:"name" gorm:"column:name;index"` + Number int `json:"number"` + BeganAt *time.Time `json:"beganAt"` + FinishedAt *time.Time `json:"finishedAt" gorm:"index"` + SpentSeconds int64 `json:"spentSeconds"` +} + +func (s Subtask20220711) TableName() string { + return "_devlake_subtasks" +} + +func (u addSubtasksTable) Up(_ context.Context, db *gorm.DB) error { + err := db.Migrator().AutoMigrate(&Subtask20220711{}) + return err +} + +func (u addSubtasksTable) Version() uint64 { + return 20220711000001 +} + +func (u addSubtasksTable) Name() string { + return "create subtask schema" +} diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go index c49fab8e9ef..47591a18651 100644 --- a/models/migrationscripts/register.go +++ b/models/migrationscripts/register.go @@ -33,5 +33,6 @@ func All() []migration.Script { new(removeNotes), new(addProjectMapping), new(addNoPKModelToCommitParent), + new(addSubtasksTable), } } diff --git a/models/task.go b/models/task.go index 9f78d6cfa97..12005763336 100644 --- a/models/task.go +++ b/models/task.go @@ -68,6 +68,20 @@ type NewTask struct { PipelineCol int `json:"-"` } +type Subtask struct { + common.Model + TaskID uint64 `json:"task_id" gorm:"index"` + Name string `json:"name" gorm:"index"` + Number int `json:"number"` + BeganAt *time.Time `json:"beganAt"` + FinishedAt *time.Time `json:"finishedAt" gorm:"index"` + SpentSeconds int64 `json:"spentSeconds"` +} + func (Task) TableName() string { return "_devlake_tasks" } + +func (Subtask) TableName() string { + return "_devlake_subtasks" +} diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go index b2a66be69f3..ee66becde61 100644 --- a/plugins/core/plugin_task.go +++ b/plugins/core/plugin_task.go @@ -19,7 +19,6 @@ package core import ( "context" - "github.com/apache/incubator-devlake/plugins/core/dal" "gorm.io/gorm" ) diff --git a/runner/directrun.go b/runner/directrun.go index 748f5be28ef..b93dd643351 100644 --- a/runner/directrun.go +++ b/runner/directrun.go @@ -79,11 +79,13 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask core.PluginTask, op panic(err) } ctx := createContext() + var parentTaskID uint64 = 0 err = RunPluginSubTasks( ctx, cfg, log, db, + parentTaskID, cmd.Use, tasks, options, diff --git a/runner/run_task.go b/runner/run_task.go index b1f783ee6a9..55ec737c209 100644 --- a/runner/run_task.go +++ b/runner/run_task.go @@ -38,7 +38,7 @@ import ( // RunTask FIXME ... func RunTask( ctx context.Context, - cfg *viper.Viper, + _ *viper.Viper, logger core.Logger, db *gorm.DB, progress chan core.RunningProgress, @@ -112,6 +112,7 @@ func RunTask( config.GetConfig(), logger.Nested(task.Plugin), db, + task.ID, task.Plugin, subtasks, options, @@ -126,6 +127,7 @@ func RunPluginTask( cfg *viper.Viper, logger core.Logger, db *gorm.DB, + taskID uint64, name string, subtasks []string, options map[string]interface{}, @@ -145,6 +147,7 @@ func RunPluginTask( cfg, logger, db, + taskID, name, subtasks, options, @@ -159,8 +162,9 @@ func RunPluginSubTasks( cfg *viper.Viper, logger core.Logger, db *gorm.DB, + taskID uint64, name string, - subtasks []string, + subtaskNames []string, options map[string]interface{}, pluginTask core.PluginTask, progress chan core.RunningProgress, @@ -181,10 +185,10 @@ func RunPluginSubTasks( */ // user specifies what subtasks to run - if len(subtasks) != 0 { + if len(subtaskNames) != 0 { // decode user specified subtasks var specifiedTasks []string - err := mapstructure.Decode(subtasks, &specifiedTasks) + err := mapstructure.Decode(subtaskNames, &specifiedTasks) if err != nil { return err } @@ -231,7 +235,7 @@ func RunPluginSubTasks( // execute subtasks in order taskCtx.SetProgress(0, steps) - i := 0 + subtaskNumber := 0 for _, subtaskMeta := range subtaskMetas { subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name) if err != nil { @@ -245,15 +249,15 @@ func RunPluginSubTasks( // run subtask logger.Info("executing subtask %s", subtaskMeta.Name) - i++ + subtaskNumber++ if progress != nil { progress <- core.RunningProgress{ Type: core.SetCurrentSubTask, SubTaskName: subtaskMeta.Name, - SubTaskNumber: i, + SubTaskNumber: subtaskNumber, } } - err = subtaskMeta.EntryPoint(subtaskCtx) + err = runSubtask(logger, db, taskID, subtaskNumber, subtaskCtx, subtaskMeta.EntryPoint) if err != nil { return &errors.SubTaskError{ SubTaskName: subtaskMeta.Name, @@ -292,3 +296,33 @@ func UpdateProgressDetail(db *gorm.DB, logger core.Logger, taskId uint64, progre progressDetail.SubTaskNumber = p.SubTaskNumber } } + +func runSubtask( + logger core.Logger, + db *gorm.DB, + parentID uint64, + subtaskNumber int, + ctx core.SubTaskContext, + entryPoint core.SubTaskEntryPoint, +) error { + beginAt := time.Now() + subtask := &models.Subtask{ + Name: ctx.GetName(), + TaskID: parentID, + Number: subtaskNumber, + BeganAt: &beginAt, + } + defer func() { + finishedAt := time.Now() + subtask.FinishedAt = &finishedAt + subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix() + recordSubtask(logger, db, subtask) + }() + return entryPoint(ctx) +} + +func recordSubtask(logger core.Logger, db *gorm.DB, subtask *models.Subtask) { + if err := db.Create(&subtask).Error; err != nil { + logger.Error("error writing subtask %d status to DB: %v", subtask.ID, err) + } +}