Skip to content

Commit

Permalink
Merge pull request #14457 from ywk253100/210316_cpu
Browse files Browse the repository at this point in the history
[cherry-pick]Fix the consume too much CPU issue
  • Loading branch information
ywk253100 committed Mar 17, 2021
2 parents f0b241c + 8b1817b commit 634be34
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 20 deletions.
4 changes: 4 additions & 0 deletions make/migrations/postgresql/0051_2.2.1_schema.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/*fixes #14358*/
UPDATE execution SET status='Success' WHERE status='Succeed';

CREATE INDEX IF NOT EXISTS task_execution_id_idx ON task (execution_id);
63 changes: 43 additions & 20 deletions src/pkg/task/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,54 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor
}

func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorID int64) error {
count := executionSweeperCount[vendorType]
if count == 0 {
log.Debugf("the execution sweeper count doesn't set for %s, skip sweep", vendorType)
size := int64(executionSweeperCount[vendorType])
if size == 0 {
log.Debugf("the execution sweeper size doesn't set for %s, skip sweep", vendorType)
return nil
}

for {
// the function "List" of the execution manager returns the execution records
// ordered by start time. After the sorting is supported in query, we should
// specify the sorting explicitly
// the execution records in second page are always the candidates should to be swept
executions, err := e.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": vendorType,
"VendorID": vendorID,
},
PageNumber: 2,
PageSize: int64(count),
})
// get the #size execution record
query := &q.Query{
Keywords: map[string]interface{}{
"VendorType": vendorType,
"VendorID": vendorID,
},
Sorts: []*q.Sort{
{
Key: "StartTime",
DESC: true,
}},
PageSize: 1,
PageNumber: size,
}
executions, err := e.executionDAO.List(ctx, query)
if err != nil {
return err
}
// list is null means that the execution count < size, return directly
if len(executions) == 0 {
return nil
}

query.Keywords["StartTime"] = &q.Range{
Max: executions[0].StartTime,
}
totalOfCandidate, err := e.executionDAO.Count(ctx, query)
if err != nil {
return err
}
// n is the page count of all candidates
n := totalOfCandidate / 1000
if totalOfCandidate%1000 > 0 {
n = n + 1
}
query.PageSize = 1000
for i := n; i >= 1; i-- {
query.PageNumber = i
executions, err := e.List(ctx, query)
if err != nil {
return err
}
// no execution records need to be swept, return directly
if len(executions) == 0 {
return nil
}
for _, execution := range executions {
// if the status of the execution isn't final, skip
if !job.Status(execution.Status).Final() {
Expand All @@ -176,6 +198,7 @@ func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorI
}
}
}
return nil
}

func (e *executionManager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error {
Expand Down

0 comments on commit 634be34

Please sign in to comment.