Skip to content

Commit

Permalink
Feature/cronjob (#14)
Browse files Browse the repository at this point in the history
* feat: 新增文件读取接口
* 支持任务手动下发和状态回显及日志回显
* 对接juno任务下发和执行
* fix: support for darwin os
* fix: 将任务锁修改为Job级别锁;锁失效后Job自动转移
  • Loading branch information
link-duan committed Sep 7, 2020
1 parent 2eefd12 commit 3e5bd48
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/job/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Config struct {
// DefaultConfig ...
func DefaultConfig() *Config {
return &Config{
ReqTimeout: 5,
ReqTimeout: 3,
}
}

Expand Down
40 changes: 39 additions & 1 deletion pkg/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"time"

"github.com/coreos/etcd/clientv3/concurrency"
"github.com/douyu/juno-agent/pkg/job/etcd"
"github.com/douyu/jupiter/pkg/client/etcdv3"
"github.com/douyu/jupiter/pkg/xlog"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -60,6 +60,12 @@ type Job struct {

// 用于访问etcd
*worker `json:"-"`
<<<<<<< feature/cronjob

mutex *etcdv3.Mutex
locked bool
=======
>>>>>>> master
}

// NewEtcdTimeoutContext return a new etcdTimeoutContext
Expand Down Expand Up @@ -204,6 +210,35 @@ func (j *Job) ValidRules() error {
return nil
}

<<<<<<< feature/cronjob
func (j *Job) Lock() error {
var err error
j.mutex, err = j.Client.NewMutex(LockKeyPrefix+j.ID, concurrency.WithTTL(10))
if err != nil {
return err
}

err = j.mutex.Lock(3 * time.Second)
if err != nil {
return err
}

j.locked = true

return nil
}

func (j *Job) Unlock() {
if j.mutex != nil {
err := j.mutex.Unlock()
if err != nil {
xlog.Error("unlock failed", xlog.FieldErr(err))
}
}
}

=======
>>>>>>> master
type Timer struct {
ID string `json:"id"`
Cron string `json:"timer"`
Expand Down Expand Up @@ -264,6 +299,8 @@ func (c *Cmd) GetID() string {
}

func (c *Cmd) Run() error {
<<<<<<< feature/cronjob
=======

if c.Job.JobType == TypeAlone {
mutex, err := etcd.NewMutex(c.Client.Client, LockKeyPrefix+c.Job.ID, concurrency.WithTTL(5))
Expand All @@ -279,6 +316,7 @@ func (c *Cmd) Run() error {
defer mutex.Unlock()
}

>>>>>>> master
if c.Job.RetryCount <= 0 {
err := c.Job.Run()
if err != nil {
Expand Down
56 changes: 53 additions & 3 deletions pkg/job/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package job
import (
"context"
"encoding/json"
"strconv"
"strings"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/douyu/juno-agent/pkg/job/etcd"
Expand All @@ -25,7 +28,6 @@ import (
"github.com/douyu/jupiter/pkg/util/xgo"
"github.com/douyu/jupiter/pkg/xlog"
"github.com/sony/sonyflake"
"strconv"
)

// Node 执行 cron 命令服务的结构体
Expand Down Expand Up @@ -68,6 +70,7 @@ func (w *worker) Run() error {
w.logger.Info("worker run...")

w.Cron.Run()
go w.watchLocks()
go w.watchJobs()
go w.watchOnce()
go w.watchExecutingProc()
Expand Down Expand Up @@ -229,6 +232,7 @@ func (w *worker) delJob(id string) {
}

delete(w.jobs, id)
job.Unlock()

cmds := job.Cmds()
if len(cmds) == 0 {
Expand All @@ -243,7 +247,6 @@ func (w *worker) delJob(id string) {

func (w *worker) modJob(job *Job) {
oJob, ok := w.jobs[job.ID]
// 之前此任务没有在当前结点执行,直接增加任务
if !ok {
w.addJob(job)
return
Expand All @@ -270,8 +273,17 @@ func (w *worker) modJob(job *Job) {
}

func (w *worker) addJob(job *Job) {
// 添加任务到当前节点
job.worker = w

if job.JobType == TypeAlone {
err := job.Lock()
if err != nil {
xlog.Info("failed to lock job. ignore it", xlog.String("jobId", job.ID))
return
}
}

// 添加任务到当前节点
w.jobs[job.ID] = job

cmds := job.Cmds()
Expand Down Expand Up @@ -366,3 +378,41 @@ func (w *worker) KillExecutingProc(process *Process) {
return
}
}

func (w *worker) watchLocks() {
wch := w.Client.Watch(context.Background(), LockKeyPrefix, clientv3.WithPrefix())

for ev := range wch {
for _, ev := range ev.Events {
switch {
case ev.Type == clientv3.EventTypeDelete:
// watch deleted job and try to lock that job
jobId := getJobIDFromLockKey(string(ev.Kv.Key))
w.tryGetJob(jobId)
}
}
}
}

func (w *worker) tryGetJob(jobId string) {
resp, err := w.Client.Get(context.Background(), JobsKeyPrefix+jobId)
if err != nil {
return
}
if len(resp.Kvs) == 0 {
return
}

jobKv := resp.Kvs[0]
job, err := w.GetJobContentFromKv(jobKv.Key, jobKv.Value)
if err != nil {
return
}

w.addJob(job)
}

func getJobIDFromLockKey(key string) (jobId string) {
key = strings.TrimLeft(key, LockKeyPrefix)
return strings.Split(key, "/")[0]
}

0 comments on commit 3e5bd48

Please sign in to comment.