/
lift.go
85 lines (78 loc) · 1.9 KB
/
lift.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package pipe
import (
"github.com/akaspin/soil/agent/bus"
"sync"
)
// Lift pipe holds set catalog of consumed map[string]string messages and
// propagates them to downstream as name:map[message-id+k]=v. To reset catalog
// pipe send map[id]map[k]string message with empty message id.
type Lift struct {
name string
consumer bus.Consumer
mu sync.Mutex
catalog map[string]bus.Payload
}
func NewLift(name string, consumer bus.Consumer) (p *Lift) {
p = &Lift{
name: name,
consumer: consumer,
catalog: map[string]bus.Payload{},
}
return
}
func (p *Lift) ConsumeMessage(message bus.Message) (err error) {
if message.Topic() == "" {
err = p.consumeReset(message)
return
}
err = p.consumeOne(message)
return
}
func (p *Lift) consumeReset(message bus.Message) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
var ingest map[string]map[string]string
if err = message.Payload().Unmarshal(&ingest); err != nil {
return
}
catalog := map[string]bus.Payload{}
for id, val := range ingest {
catalog[id] = bus.NewPayload(val)
}
p.catalog = catalog
err = p.consumer.ConsumeMessage(p.makeMessage())
return
}
func (p *Lift) consumeOne(message bus.Message) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
topic := message.Topic()
if current, exists := p.catalog[topic]; exists {
if message.Payload().IsEmpty() {
delete(p.catalog, topic)
err = p.consumer.ConsumeMessage(p.makeMessage())
return
}
if current.Hash() == message.Payload().Hash() {
return
}
}
if message.Payload().IsEmpty() {
return
}
p.catalog[topic] = message.Payload()
err = p.consumer.ConsumeMessage(p.makeMessage())
return
}
func (p *Lift) makeMessage() (res bus.Message) {
fields := map[string]string{}
for root, payload := range p.catalog {
var data map[string]string
payload.Unmarshal(&data)
for k, v := range data {
fields[root+"."+k] = v
}
}
res = bus.NewMessage(p.name, fields)
return
}