From 269e3061606e7fef7a921e9d63626e62d5ed5414 Mon Sep 17 00:00:00 2001 From: Lynwee <1507509064@qq.com> Date: Sat, 14 Sep 2024 15:32:42 +0800 Subject: [PATCH 1/2] fix(framework): fix finished_record count in _devlake_subtasks (#8054) --- backend/core/runner/run_task.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go index 02d92c34c8f..0db51f574b8 100644 --- a/backend/core/runner/run_task.go +++ b/backend/core/runner/run_task.go @@ -369,19 +369,17 @@ func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDeta progressDetail.TotalRecords = p.Total case plugin.SubTaskIncProgress: progressDetail.FinishedRecords = p.Current - // update subtask progress - where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) - err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ - {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, - }, where) - if err != nil { - basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") - } case plugin.SetCurrentSubTask: progressDetail.SubTaskName = p.SubTaskName progressDetail.SubTaskNumber = p.SubTaskNumber - default: - return + } + // update subtask progress + where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) + err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ + {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, + }, where) + if err != nil { + basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") } } @@ -417,7 +415,7 @@ func recordSubtask(basicRes context.BasicRes, subtask *models.Subtask) { {ColumnName: "began_at", Value: subtask.BeganAt}, {ColumnName: "finished_at", Value: subtask.FinishedAt}, {ColumnName: "spent_seconds", Value: subtask.SpentSeconds}, - {ColumnName: "finished_records", Value: subtask.FinishedRecords}, + //{ColumnName: "finished_records", Value: subtask.FinishedRecords}, // FinishedRecords is zero always. {ColumnName: "number", Value: subtask.Number}, }, where); err != nil { basicRes.GetLogger().Error(err, "error writing subtask %d status to DB: %v", subtask.ID) From 1631d66e2f939738ffb0a75ee59b193382d7e074 Mon Sep 17 00:00:00 2001 From: NaRro Date: Wed, 23 Oct 2024 10:31:08 +0800 Subject: [PATCH 2/2] feat: not update sub task progress if progress less than 1 pct (#8152) [Refactor][core]Data inflation when using postgres #8142 --- backend/core/runner/run_task.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go index 0db51f574b8..8a3515768a7 100644 --- a/backend/core/runner/run_task.go +++ b/backend/core/runner/run_task.go @@ -20,10 +20,11 @@ package runner import ( gocontext "context" "fmt" - "github.com/apache/incubator-devlake/core/models/common" "strings" "time" + "github.com/apache/incubator-devlake/core/models/common" + "github.com/apache/incubator-devlake/core/context" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -353,6 +354,7 @@ func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDeta Model: common.Model{ID: taskId}, } subtask := &models.Subtask{} + originalFinishedRecords := progressDetail.FinishedRecords switch p.Type { case plugin.TaskSetProgress: progressDetail.TotalSubTasks = p.Total @@ -372,14 +374,22 @@ func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDeta case plugin.SetCurrentSubTask: progressDetail.SubTaskName = p.SubTaskName progressDetail.SubTaskNumber = p.SubTaskNumber + // reset finished records + progressDetail.FinishedRecords = 0 } - // update subtask progress - where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) - err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ - {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, - }, where) - if err != nil { - basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") + currentFinishedRecords := progressDetail.FinishedRecords + currentTotalRecords := progressDetail.TotalRecords + // update progress if progress is more than 1% + // or there is progress if no total record provided + if (currentTotalRecords > 0 && float64(currentFinishedRecords-originalFinishedRecords)/float64(currentTotalRecords) > 0.01) || (currentTotalRecords <= 0 && currentFinishedRecords > originalFinishedRecords) { + // update subtask progress + where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) + err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ + {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, + }, where) + if err != nil { + basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") + } } }