-
Notifications
You must be signed in to change notification settings - Fork 0
/
def.go
131 lines (103 loc) · 2.99 KB
/
def.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package internal
import (
"context"
"log"
"os"
"reflect"
"sync/atomic"
redis "github.com/Bofry/lib-redis-stream"
"github.com/Bofry/trace"
"go.opentelemetry.io/otel/propagation"
)
const (
LOGGER_PREFIX string = "[worker-redis] "
__CONTEXT_REPLY_KEY ctxReplyKeyType = 0
)
const (
UNSET ReplyCode = iota
PASS
FAIL
ABORT
__reply_code_minimum__ = UNSET
__reply_code_maximum__ = ABORT
INVALID ReplyCode = -1
__reply_code_invalid_text__ = "invalid"
)
const (
RestrictedForwardMessage_InvalidOperation int = 0
RestrictedForwardMessage_Recursive int = 1
)
var (
typeOfHost = reflect.TypeOf(RedisWorker{})
typeOfMessageObserverAffair = reflect.TypeOf((*MessageObserverAffair)(nil)).Elem()
defaultTracerProvider = createNoopTracerProvider()
defaultTextMapPropagator = createNoopTextMapPropagator()
globalTracerManager = defaultTracerManager()
GlobalContextHelper ContextHelper = ContextHelper{}
GlobalRestrictedMessageDelegate redis.MessageDelegate = RestrictedMessageDelegate(0)
GlobalMessageHelper MessageHelper = MessageHelper{}
RedisWorkerModuleInstance = RedisWorkerModule{}
RedisWorkerLogger *log.Logger = log.New(os.Stdout, LOGGER_PREFIX, log.LstdFlags|log.Lmsgprefix)
)
type (
ctxReplyKeyType int
tracerManagerHolder struct {
v *TracerManager
}
UniversalOptions = redis.UniversalOptions
UniversalClient = redis.UniversalClient
XMessage = redis.XMessage
XStream = redis.XStream
Message = redis.Message
StreamOffset = redis.StreamOffset
StreamOffsetInfo = redis.StreamOffsetInfo
MessageHandleModule interface {
CanSetSuccessor() bool
SetSuccessor(successor MessageHandleModule)
ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)
OnInitComplete()
OnStart(ctx context.Context) error
OnStop(ctx context.Context) error
}
MessageObserver interface {
OnAck(ctx *Context, message *Message)
OnDel(ctx *Context, message *Message)
Type() reflect.Type
}
MessageObserverAffair interface {
MessageObserverTypes() []reflect.Type
}
MessageHandler interface {
ProcessMessage(ctx *Context, message *Message)
}
ErrorHandler func(ctx *Context, message *Message, err interface{})
OnHostErrorHandler func(err error) (disposed bool)
)
func createNoopTracerProvider() *trace.SeverityTracerProvider {
tp, err := trace.NoopProvider()
if err != nil {
RedisWorkerLogger.Fatalf("cannot create NoopProvider: %v", err)
}
return tp
}
func createNoopTextMapPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator()
}
func defaultTracerManager() *atomic.Value {
v := &atomic.Value{}
v.Store(tracerManagerHolder{
v: NewTraceManager(),
})
return v
}
func GetTracerManager() *TracerManager {
return globalTracerManager.Load().(tracerManagerHolder).v
}
func SetTracerManager(v *TracerManager) {
current := GetTracerManager()
if current != v {
globalTracerManager.Store(tracerManagerHolder{
v: v,
})
}
}