From 5d8b3abbbd5d1af4d9c7c93ddb36ae913279c615 Mon Sep 17 00:00:00 2001 From: Keon Amini Date: Fri, 8 Jul 2022 17:18:21 -0500 Subject: [PATCH 1/3] feat(framework): add execution time for plugin subtasks (#2433) --- .../20220711_add_subtasks_table.go | 38 ++++++++++++++ models/migrationscripts/register.go | 1 + models/task.go | 13 +++++ plugins/core/plugin_task.go | 1 - runner/directrun.go | 2 + runner/run_task.go | 50 ++++++++++++++++--- 6 files changed, 96 insertions(+), 9 deletions(-) create mode 100644 models/migrationscripts/20220711_add_subtasks_table.go diff --git a/models/migrationscripts/20220711_add_subtasks_table.go b/models/migrationscripts/20220711_add_subtasks_table.go new file mode 100644 index 00000000000..cec232a683a --- /dev/null +++ b/models/migrationscripts/20220711_add_subtasks_table.go @@ -0,0 +1,38 @@ +package migrationscripts + +import ( + "context" + commonArchived "github.com/apache/incubator-devlake/models/migrationscripts/archived" + "gorm.io/gorm" + "time" +) + +type addSubtasksTable struct { +} + +// Subtask20220711 DB snapshot model of models.Subtask +type Subtask20220711 struct { + commonArchived.Model + TaskID uint64 `json:"task_id" gorm:"index"` + SubtaskName string `json:"name" gorm:"column:name;index"` + BeganAt *time.Time `json:"beganAt"` + FinishedAt *time.Time `json:"finishedAt" gorm:"index"` + SpentSeconds int64 `json:"spentSeconds"` +} + +func (s Subtask20220711) TableName() string { + return "_devlake_subtasks" +} + +func (u addSubtasksTable) Up(_ context.Context, db *gorm.DB) error { + err := db.Migrator().AutoMigrate(&Subtask20220711{}) + return err +} + +func (u addSubtasksTable) Version() uint64 { + return 20220711000001 +} + +func (u addSubtasksTable) Name() string { + return "create subtask schema" +} diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go index c49fab8e9ef..47591a18651 100644 --- a/models/migrationscripts/register.go +++ b/models/migrationscripts/register.go @@ -33,5 +33,6 @@ func All() []migration.Script { new(removeNotes), new(addProjectMapping), new(addNoPKModelToCommitParent), + new(addSubtasksTable), } } diff --git a/models/task.go b/models/task.go index 9f78d6cfa97..90a2f501222 100644 --- a/models/task.go +++ b/models/task.go @@ -68,6 +68,19 @@ type NewTask struct { PipelineCol int `json:"-"` } +type Subtask struct { + common.Model + TaskID uint64 `json:"task_id" gorm:"index"` + Name string `json:"name" gorm:"index"` + BeganAt *time.Time `json:"beganAt"` + FinishedAt *time.Time `json:"finishedAt" gorm:"index"` + SpentSeconds int64 `json:"spentSeconds"` +} + func (Task) TableName() string { return "_devlake_tasks" } + +func (Subtask) TableName() string { + return "_devlake_subtasks" +} diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go index b2a66be69f3..ee66becde61 100644 --- a/plugins/core/plugin_task.go +++ b/plugins/core/plugin_task.go @@ -19,7 +19,6 @@ package core import ( "context" - "github.com/apache/incubator-devlake/plugins/core/dal" "gorm.io/gorm" ) diff --git a/runner/directrun.go b/runner/directrun.go index 748f5be28ef..b93dd643351 100644 --- a/runner/directrun.go +++ b/runner/directrun.go @@ -79,11 +79,13 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask core.PluginTask, op panic(err) } ctx := createContext() + var parentTaskID uint64 = 0 err = RunPluginSubTasks( ctx, cfg, log, db, + parentTaskID, cmd.Use, tasks, options, diff --git a/runner/run_task.go b/runner/run_task.go index b1f783ee6a9..6cf1e99ec52 100644 --- a/runner/run_task.go +++ b/runner/run_task.go @@ -38,7 +38,7 @@ import ( // RunTask FIXME ... func RunTask( ctx context.Context, - cfg *viper.Viper, + _ *viper.Viper, logger core.Logger, db *gorm.DB, progress chan core.RunningProgress, @@ -112,6 +112,7 @@ func RunTask( config.GetConfig(), logger.Nested(task.Plugin), db, + task.ID, task.Plugin, subtasks, options, @@ -126,6 +127,7 @@ func RunPluginTask( cfg *viper.Viper, logger core.Logger, db *gorm.DB, + taskID uint64, name string, subtasks []string, options map[string]interface{}, @@ -145,6 +147,7 @@ func RunPluginTask( cfg, logger, db, + taskID, name, subtasks, options, @@ -159,8 +162,9 @@ func RunPluginSubTasks( cfg *viper.Viper, logger core.Logger, db *gorm.DB, + taskID uint64, name string, - subtasks []string, + subtaskNames []string, options map[string]interface{}, pluginTask core.PluginTask, progress chan core.RunningProgress, @@ -181,10 +185,10 @@ func RunPluginSubTasks( */ // user specifies what subtasks to run - if len(subtasks) != 0 { + if len(subtaskNames) != 0 { // decode user specified subtasks var specifiedTasks []string - err := mapstructure.Decode(subtasks, &specifiedTasks) + err := mapstructure.Decode(subtaskNames, &specifiedTasks) if err != nil { return err } @@ -231,7 +235,11 @@ func RunPluginSubTasks( // execute subtasks in order taskCtx.SetProgress(0, steps) - i := 0 + subtaskNumber := 0 + var subtasks []*models.Subtask + defer func() { + recordSubtasks(logger, db, subtasks) + }() for _, subtaskMeta := range subtaskMetas { subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name) if err != nil { @@ -245,15 +253,16 @@ func RunPluginSubTasks( // run subtask logger.Info("executing subtask %s", subtaskMeta.Name) - i++ + subtaskNumber++ if progress != nil { progress <- core.RunningProgress{ Type: core.SetCurrentSubTask, SubTaskName: subtaskMeta.Name, - SubTaskNumber: i, + SubTaskNumber: subtaskNumber, } } - err = subtaskMeta.EntryPoint(subtaskCtx) + subtask, err := runSubtask(taskID, subtaskCtx, subtaskMeta.EntryPoint) + subtasks = append(subtasks, subtask) if err != nil { return &errors.SubTaskError{ SubTaskName: subtaskMeta.Name, @@ -292,3 +301,28 @@ func UpdateProgressDetail(db *gorm.DB, logger core.Logger, taskId uint64, progre progressDetail.SubTaskNumber = p.SubTaskNumber } } + +func runSubtask( + parentID uint64, + ctx core.SubTaskContext, + entryPoint core.SubTaskEntryPoint, +) (*models.Subtask, error) { + beginAt := time.Now() + subtask := &models.Subtask{ + Name: ctx.GetName(), + TaskID: parentID, + BeganAt: &beginAt, + } + defer func() { + finishedAt := time.Now() + subtask.FinishedAt = &finishedAt + subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix() + }() + return subtask, entryPoint(ctx) +} + +func recordSubtasks(logger core.Logger, db *gorm.DB, subtasks []*models.Subtask) { + if err := db.Create(&subtasks).Error; err != nil { + logger.Error("error writing %d subtask statuses to DB: %v", len(subtasks), err) + } +} From 895b1a6edab03c635067526073fca5cd98b315d5 Mon Sep 17 00:00:00 2001 From: Keon Amini Date: Mon, 1 Aug 2022 18:47:12 -0500 Subject: [PATCH 2/3] feat: add subtask number to subtask model --- models/migrationscripts/20220711_add_subtasks_table.go | 1 + models/task.go | 1 + runner/run_task.go | 4 +++- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/models/migrationscripts/20220711_add_subtasks_table.go b/models/migrationscripts/20220711_add_subtasks_table.go index cec232a683a..3798289f883 100644 --- a/models/migrationscripts/20220711_add_subtasks_table.go +++ b/models/migrationscripts/20220711_add_subtasks_table.go @@ -15,6 +15,7 @@ type Subtask20220711 struct { commonArchived.Model TaskID uint64 `json:"task_id" gorm:"index"` SubtaskName string `json:"name" gorm:"column:name;index"` + Number int `json:"number"` BeganAt *time.Time `json:"beganAt"` FinishedAt *time.Time `json:"finishedAt" gorm:"index"` SpentSeconds int64 `json:"spentSeconds"` diff --git a/models/task.go b/models/task.go index 90a2f501222..12005763336 100644 --- a/models/task.go +++ b/models/task.go @@ -72,6 +72,7 @@ type Subtask struct { common.Model TaskID uint64 `json:"task_id" gorm:"index"` Name string `json:"name" gorm:"index"` + Number int `json:"number"` BeganAt *time.Time `json:"beganAt"` FinishedAt *time.Time `json:"finishedAt" gorm:"index"` SpentSeconds int64 `json:"spentSeconds"` diff --git a/runner/run_task.go b/runner/run_task.go index 6cf1e99ec52..d3e11c6ec9f 100644 --- a/runner/run_task.go +++ b/runner/run_task.go @@ -261,7 +261,7 @@ func RunPluginSubTasks( SubTaskNumber: subtaskNumber, } } - subtask, err := runSubtask(taskID, subtaskCtx, subtaskMeta.EntryPoint) + subtask, err := runSubtask(taskID, subtaskNumber, subtaskCtx, subtaskMeta.EntryPoint) subtasks = append(subtasks, subtask) if err != nil { return &errors.SubTaskError{ @@ -304,6 +304,7 @@ func UpdateProgressDetail(db *gorm.DB, logger core.Logger, taskId uint64, progre func runSubtask( parentID uint64, + subtaskNumber int, ctx core.SubTaskContext, entryPoint core.SubTaskEntryPoint, ) (*models.Subtask, error) { @@ -311,6 +312,7 @@ func runSubtask( subtask := &models.Subtask{ Name: ctx.GetName(), TaskID: parentID, + Number: subtaskNumber, BeganAt: &beginAt, } defer func() { From bb23518460d13e0e9ca582659499e38f4ce21a15 Mon Sep 17 00:00:00 2001 From: Keon Amini Date: Mon, 1 Aug 2022 22:33:19 -0500 Subject: [PATCH 3/3] feat: Remove batched saving in favor of sequential closes #2433 --- runner/run_task.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/runner/run_task.go b/runner/run_task.go index d3e11c6ec9f..55ec737c209 100644 --- a/runner/run_task.go +++ b/runner/run_task.go @@ -236,10 +236,6 @@ func RunPluginSubTasks( // execute subtasks in order taskCtx.SetProgress(0, steps) subtaskNumber := 0 - var subtasks []*models.Subtask - defer func() { - recordSubtasks(logger, db, subtasks) - }() for _, subtaskMeta := range subtaskMetas { subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name) if err != nil { @@ -261,8 +257,7 @@ func RunPluginSubTasks( SubTaskNumber: subtaskNumber, } } - subtask, err := runSubtask(taskID, subtaskNumber, subtaskCtx, subtaskMeta.EntryPoint) - subtasks = append(subtasks, subtask) + err = runSubtask(logger, db, taskID, subtaskNumber, subtaskCtx, subtaskMeta.EntryPoint) if err != nil { return &errors.SubTaskError{ SubTaskName: subtaskMeta.Name, @@ -303,11 +298,13 @@ func UpdateProgressDetail(db *gorm.DB, logger core.Logger, taskId uint64, progre } func runSubtask( + logger core.Logger, + db *gorm.DB, parentID uint64, subtaskNumber int, ctx core.SubTaskContext, entryPoint core.SubTaskEntryPoint, -) (*models.Subtask, error) { +) error { beginAt := time.Now() subtask := &models.Subtask{ Name: ctx.GetName(), @@ -319,12 +316,13 @@ func runSubtask( finishedAt := time.Now() subtask.FinishedAt = &finishedAt subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix() + recordSubtask(logger, db, subtask) }() - return subtask, entryPoint(ctx) + return entryPoint(ctx) } -func recordSubtasks(logger core.Logger, db *gorm.DB, subtasks []*models.Subtask) { - if err := db.Create(&subtasks).Error; err != nil { - logger.Error("error writing %d subtask statuses to DB: %v", len(subtasks), err) +func recordSubtask(logger core.Logger, db *gorm.DB, subtask *models.Subtask) { + if err := db.Create(&subtask).Error; err != nil { + logger.Error("error writing subtask %d status to DB: %v", subtask.ID, err) } }