-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.goTestTopicMessageHandler.go
70 lines (54 loc) · 1.65 KB
/
handler.goTestTopicMessageHandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package test
import (
"context"
"fmt"
"reflect"
"github.com/Bofry/trace"
nsq "github.com/Bofry/worker-nsq"
"github.com/Bofry/worker-nsq/tracing"
)
var (
_ nsq.MessageHandler = new(GoTestTopicMessageHandler)
_ nsq.MessageObserverAffair = new(GoTestTopicMessageHandler)
)
type GoTestTopicMessageHandler struct {
ServiceProvider *ServiceProvider
counter *GoTestTopicMessageCounter
}
func (h *GoTestTopicMessageHandler) Init() {
fmt.Println("GoTestTopicMessageHandler.Init()")
h.counter = new(GoTestTopicMessageCounter)
}
func (h *GoTestTopicMessageHandler) ProcessMessage(ctx *nsq.Context, message *nsq.Message) error {
ctx.Logger().Printf("Message on %s (%s): [%s] %v\n", message.Topic, message.NSQDAddress, message.ID, string(message.Body))
sp := trace.SpanFromContext(ctx)
sp.Argv(string(message.Body))
if message.Topic == "gotest2Topic" {
h.doSomething(sp.Context())
return ctx.InvalidMessage(message)
}
h.counter.increase(sp.Context())
// NOTE: the parent ProcessMessage will call message.Finish() automatically
return nil
}
// MessageObserverTypes implements internal.MessageObserverAffair.
func (*GoTestTopicMessageHandler) MessageObserverTypes() []reflect.Type {
return []reflect.Type{
MessageObserverManager.GoTestStreamMessageObserver.Type(),
}
}
func (h *GoTestTopicMessageHandler) doSomething(ctx context.Context) {
tr := tracing.GetTracer(h)
sp := tr.Start(ctx, "doSomething()")
defer sp.End()
}
type GoTestTopicMessageCounter struct {
count int
}
func (c *GoTestTopicMessageCounter) increase(ctx context.Context) int {
tr := tracing.GetTracer(c)
sp := tr.Start(ctx, "increase()")
defer sp.End()
c.count++
return c.count
}