/
queue.go
122 lines (95 loc) · 2.7 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package types
import (
"database/sql/driver"
"encoding/json"
"time"
"github.com/cortezaproject/corteza/server/pkg/sql"
"github.com/cortezaproject/corteza/server/pkg/filter"
"github.com/spf13/cast"
)
type (
Queue struct {
ID uint64 `json:"queueID,string"`
Consumer string `json:"consumer"`
Queue string `json:"queue"`
Meta QueueMeta `json:"meta"`
CreatedAt time.Time `json:"createdAt,omitempty"`
CreatedBy uint64 `json:"createdBy,string" `
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
UpdatedBy uint64 `json:"updatedBy,string,omitempty" `
DeletedAt *time.Time `json:"deletedAt,omitempty"`
DeletedBy uint64 `json:"deletedBy,string,omitempty" `
}
QueueFilter struct {
QueueID []string `json:"queueID"`
Query string `json:"query"`
Deleted filter.State `json:"deleted"`
// Check fn is called by store backend for each resource found function can
// modify the resource and return false if store should not return it
//
// Store then loads additional resources to satisfy the paging parameters
Check func(*Queue) (bool, error) `json:"-"`
filter.Sorting
filter.Paging
}
QueueMessage struct {
ID uint64 `json:"messageID"`
Queue string `json:"queue"`
Payload []byte `json:"payload"`
Created *time.Time `json:"created"`
Processed *time.Time `json:"processed"`
}
QueueMessageFilter struct {
Queue string
Processed filter.State `json:"processed"`
filter.Sorting
filter.Paging
}
QueueMeta struct {
PollDelay *time.Duration `json:"poll_delay"`
DispatchEvents bool `json:"dispatch_events"`
}
)
func (h *QueueMeta) UnmarshalJSON(s []byte) error {
type Alias QueueMeta
aux := &struct {
PollDelay string `json:"poll_delay"`
*Alias
}{
Alias: (*Alias)(h),
}
// set default
h.DispatchEvents = false
if err := json.Unmarshal(s, aux); err != nil {
return err
}
if d, err := cast.ToDurationE(aux.PollDelay); err == nil {
h.PollDelay = &d
}
return nil
}
func (m *QueueMeta) Scan(src any) error { return sql.ParseJSON(src, m) }
func (m QueueMeta) Value() (driver.Value, error) { return json.Marshal(m) }
func (m QueueMeta) MarshalJSON() ([]byte, error) {
pollDelay := ""
if m.PollDelay != nil {
pollDelay = m.PollDelay.String()
}
return json.Marshal(struct {
PollDelay string `json:"poll_delay"`
DispatchEvents bool `json:"dispatch_events"`
}{
PollDelay: pollDelay,
DispatchEvents: m.DispatchEvents,
})
}
func (s *Queue) CanDispatch() bool {
return s.Meta.DispatchEvents
}
func ParseQueueMeta(ss []string) (p QueueMeta, err error) {
p = QueueMeta{}
if len(ss) == 0 {
return
}
return p, json.Unmarshal([]byte(ss[0]), &p)
}