forked from tarantool/go-tarantool
/
task.go
121 lines (102 loc) · 2.59 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package queue
import (
"fmt"
"time"
"github.com/vmihailenco/msgpack/v5"
)
// Task represents a task from Tarantool queue's tube.
type Task struct {
id uint64
status string
data interface{}
q *queue
}
func (t *Task) DecodeMsgpack(d *msgpack.Decoder) error {
var err error
var l int
if l, err = d.DecodeArrayLen(); err != nil {
return err
}
if l < 3 {
return fmt.Errorf("array len doesn't match: %d", l)
}
if t.id, err = d.DecodeUint64(); err != nil {
return err
}
if t.status, err = d.DecodeString(); err != nil {
return err
}
if t.data != nil {
d.Decode(t.data)
} else if t.data, err = d.DecodeInterface(); err != nil {
return err
}
return nil
}
// Id is a getter for task id.
func (t *Task) Id() uint64 {
return t.id
}
// Data is a getter for task data.
func (t *Task) Data() interface{} {
return t.data
}
// Status is a getter for task status.
func (t *Task) Status() string {
return t.status
}
// Touch increases ttr of running task.
func (t *Task) Touch(increment time.Duration) error {
return t.accept(t.q._touch(t.id, increment))
}
// Ack signals about task completion.
func (t *Task) Ack() error {
return t.accept(t.q._ack(t.id))
}
// Delete task from queue.
func (t *Task) Delete() error {
return t.accept(t.q._delete(t.id))
}
// Bury signals that task task cannot be executed in the current circumstances,
// task becomes "buried" - ie neither completed, nor ready, so it could not be
// deleted or taken by other worker.
// To revert "burying" call queue.Kick(numberOfBurried).
func (t *Task) Bury() error {
return t.accept(t.q._bury(t.id))
}
// Release returns task back in the queue without making it complete.
// In other words, this worker failed to complete the task, and
// it, so other worker could try to do that again.
func (t *Task) Release() error {
return t.accept(t.q._release(t.id, Opts{}))
}
// ReleaseCfg returns task to a queue and changes its configuration.
func (t *Task) ReleaseCfg(cfg Opts) error {
return t.accept(t.q._release(t.id, cfg))
}
func (t *Task) accept(newStatus string, err error) error {
if err == nil {
t.status = newStatus
}
return err
}
// IsReady returns if task is ready.
func (t *Task) IsReady() bool {
return t.status == READY
}
// IsTaken returns if task is taken.
func (t *Task) IsTaken() bool {
return t.status == TAKEN
}
// IsDone returns if task is done.
func (t *Task) IsDone() bool {
return t.status == DONE
}
// IsBurred returns if task is buried.
func (t *Task) IsBuried() bool {
return t.status == BURIED
}
// IsDelayed returns if task is delayed.
func (t *Task) IsDelayed() bool {
return t.status == DELAYED
}