/
job.go
86 lines (73 loc) · 2.02 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package cron
import (
"context"
"time"
"github.com/robfig/cron/v3"
"github.com/uber-go/tally/v4"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
"golang.org/x/xerrors"
"github.com/coinbase/chainstorage/internal/config"
"github.com/coinbase/chainstorage/internal/utils/instrument"
"github.com/coinbase/chainstorage/internal/utils/log"
)
type (
Job struct {
ctx context.Context
logger *zap.Logger
instrument instrument.Instrument
task Task
semaphore *semaphore.Weighted
}
)
const (
taskTag = "task"
loggerMsg = "cron.job"
delayStartDurationLocal = 10 * time.Second
)
var _ cron.Job = (*Job)(nil)
func NewJob(ctx context.Context, cfg *config.Config, logger *zap.Logger, metrics tally.Scope, task Task) (*Job, error) {
parallelism := task.Parallelism()
if parallelism <= 0 {
return nil, xerrors.Errorf("invalid parallelism: %v", parallelism)
}
sem := semaphore.NewWeighted(parallelism)
if err := sem.Acquire(ctx, parallelism); err != nil {
return nil, xerrors.Errorf("failed to acquire the semaphore: %w", err)
}
taskName := task.Name()
// Delay the job to prevent false alarms during the deployment.
delayStartDuration := task.DelayStartDuration()
if cfg.Env() == config.EnvLocal {
delayStartDuration = delayStartDurationLocal
}
timer := time.NewTimer(delayStartDuration)
go func() {
logger.Info("delay start", zap.String("duration", delayStartDuration.String()))
<-timer.C
sem.Release(parallelism)
}()
return &Job{
ctx: ctx,
logger: log.WithPackage(logger),
instrument: instrument.New(
metrics,
taskName,
instrument.WithLogger(logger.With(zap.String(taskTag, taskName)), loggerMsg),
),
task: task,
semaphore: sem,
}, nil
}
func (j *Job) Run() {
ctx := j.ctx
taskName := j.task.Name()
if j.semaphore.TryAcquire(1) {
defer j.semaphore.Release(1)
_ = j.instrument.Instrument(ctx, func(ctx context.Context) error {
return j.task.Run(ctx)
})
} else {
j.logger.Info("skipped task", zap.String("task", taskName))
}
}