Skip to content

Commit

Permalink
feat: store preheat result (#1516)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Aug 2, 2022
1 parent d2363bf commit 9b0fd1a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 34 deletions.
6 changes: 6 additions & 0 deletions internal/dflog/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ func WithHostnameAndIP(hostname, ip string) *SugaredLoggerOnWith {
}
}

func WithTaskAndJobID(taskID, jobID string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"taskID", taskID, "jobID", jobID},
}
}

func (log *SugaredLoggerOnWith) With(args ...any) *SugaredLoggerOnWith {
args = append(args, log.withArgs...)
return &SugaredLoggerOnWith{
Expand Down
14 changes: 12 additions & 2 deletions internal/job/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,23 @@

package job

// Queue Name
// Queue Name.
const (
GlobalQueue = Queue("global")
SchedulersQueue = Queue("schedulers")
)

// Job Name
// Job Name.
const (
PreheatJob = "preheat"
)

// Machinery server configuration.
const (
DefaultResultsExpireIn = 86400
DefaultRedisMaxIdle = 10
DefaultRedisIdleTimeout = 300
DefaultRedisReadTimeout = 60
DefaultRedisWriteTimeout = 60
DefaultRedisConnectTimeout = 60
)
21 changes: 13 additions & 8 deletions internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ import (
"github.com/go-redis/redis/v8"
)

const (
DefaultResultsExpireIn = 86400
)

type Config struct {
Host string
Port int
Expand Down Expand Up @@ -71,14 +67,19 @@ func New(cfg *Config, queue Queue) (*Job, error) {
return nil, err
}

var cnf = &machineryv1config.Config{
server, err := machinery.NewServer(&machineryv1config.Config{
Broker: broker,
DefaultQueue: queue.String(),
ResultBackend: backend,
ResultsExpireIn: DefaultResultsExpireIn,
}

server, err := machinery.NewServer(cnf)
Redis: &machineryv1config.RedisConfig{
MaxIdle: DefaultRedisMaxIdle,
IdleTimeout: DefaultRedisIdleTimeout,
ReadTimeout: DefaultRedisReadTimeout,
WriteTimeout: DefaultRedisWriteTimeout,
ConnectTimeout: DefaultRedisConnectTimeout,
},
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -107,6 +108,7 @@ type GroupJobState struct {
GroupUUID string
State string
CreatedAt time.Time
JobStates []*machineryv1tasks.TaskState
}

func (t *Job) GetGroupJobState(groupUUID string) (*GroupJobState, error) {
Expand All @@ -125,6 +127,7 @@ func (t *Job) GetGroupJobState(groupUUID string) (*GroupJobState, error) {
GroupUUID: groupUUID,
State: machineryv1tasks.StateFailure,
CreatedAt: jobState.CreatedAt,
JobStates: jobStates,
}, nil
}
}
Expand All @@ -135,6 +138,7 @@ func (t *Job) GetGroupJobState(groupUUID string) (*GroupJobState, error) {
GroupUUID: groupUUID,
State: machineryv1tasks.StatePending,
CreatedAt: jobState.CreatedAt,
JobStates: jobStates,
}, nil
}
}
Expand All @@ -143,6 +147,7 @@ func (t *Job) GetGroupJobState(groupUUID string) (*GroupJobState, error) {
GroupUUID: groupUUID,
State: machineryv1tasks.StateSuccess,
CreatedAt: jobStates[0].CreatedAt,
JobStates: jobStates,
}, nil
}

Expand Down
13 changes: 0 additions & 13 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,3 @@ func New(cfg *config.Config) (*Job, error) {
Preheat: p,
}, nil
}

func (j *Job) GetGroupJobState(id string) (*internaljob.GroupJobState, error) {
groupJobState, err := j.Job.GetGroupJobState(id)
if err != nil {
return nil, err
}

return &internaljob.GroupJobState{
GroupUUID: groupJobState.GroupUUID,
State: groupJobState.State,
CreatedAt: groupJobState.CreatedAt,
}, nil
}
39 changes: 28 additions & 11 deletions manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package service

import (
"context"
"errors"
"fmt"

machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
Expand Down Expand Up @@ -98,34 +99,50 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
}

func (s *service) pollingJob(ctx context.Context, id uint, taskID string) {
var job model.Job

var (
job model.Job
log = logger.WithTaskAndJobID(taskID, fmt.Sprint(id))
)
if _, _, err := retry.Run(ctx, 5, 10, 120, func() (any, bool, error) {
groupJob, err := s.job.GetGroupJobState(taskID)
if err != nil {
logger.Errorf("polling job %d and task %s failed: %v", id, taskID, err)
log.Errorf("polling job failed: %s", err.Error())
return nil, false, err
}

result, err := structure.StructToMap(groupJob)
if err != nil {
log.Errorf("polling job failed: %s", err.Error())
return nil, false, err
}

if err := s.db.WithContext(ctx).First(&job, id).Updates(model.Job{
State: groupJob.State,
State: groupJob.State,
Result: result,
}).Error; err != nil {
logger.Errorf("polling job %d and task %s store failed: %v", id, taskID, err)
log.Errorf("polling job failed: %s", err.Error())
return nil, true, err
}

switch job.State {
case machineryv1tasks.StateSuccess:
logger.Infof("polling job %d and task %s is finally successful", id, taskID)
log.Info("polling job success")
return nil, true, nil
case machineryv1tasks.StateFailure:
logger.Errorf("polling job %d and task %s is finally failed", id, taskID)
var jobStates []machineryv1tasks.TaskState
for _, jobState := range groupJob.JobStates {
jobStates = append(jobStates, *jobState)
}

log.Errorf("polling job failed: %#v", jobStates)
return nil, true, nil
default:
return nil, false, fmt.Errorf("polling job %d and task %s status is %s", id, taskID, job.State)
msg := fmt.Sprintf("unknow state %s", job.State)
log.Error(msg)
return nil, false, errors.New(msg)
}
}); err != nil {
logger.Errorf("polling job %d and task %s failed %s", id, taskID, err)
log.Errorf("polling job failed: %s", err.Error())
}

// Polling timeout and failed
Expand All @@ -134,9 +151,9 @@ func (s *service) pollingJob(ctx context.Context, id uint, taskID string) {
if err := s.db.WithContext(ctx).First(&job, id).Updates(model.Job{
State: machineryv1tasks.StateFailure,
}).Error; err != nil {
logger.Errorf("polling job %d and task %s store failed: %v", id, taskID, err)
log.Errorf("polling job failed: %s", err.Error())
}
logger.Errorf("polling job %d and task %s timeout", id, taskID)
log.Error("polling job timeout")
}
}

Expand Down

0 comments on commit 9b0fd1a

Please sign in to comment.