-
Notifications
You must be signed in to change notification settings - Fork 216
/
translate.go
148 lines (130 loc) · 5.43 KB
/
translate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package binding
import (
"context"
"errors"
ce "github.com/cloudevents/sdk-go"
)
const (
SKIP_DIRECT_STRUCTURED_ENCODING = "SKIP_DIRECT_STRUCTURED_ENCODING"
SKIP_DIRECT_BINARY_ENCODING = "SKIP_DIRECT_BINARY_ENCODING"
PREFERRED_EVENT_ENCODING = "PREFERRED_EVENT_ENCODING"
)
// Invokes the encoders. createRootStructuredEncoder and createRootBinaryEncoder could be null if the protocol doesn't support it
//
// Returns:
// * EncodingStructured, nil if message was structured and correctly translated to Event
// * EncodingBinary, nil if message was binary and correctly translated to Event
// * EncodingStructured, err if message was structured but error happened during translation
// * BinaryEncoding, err if message was binary but error happened during translation
// * EncodingUnknown, ErrUnknownEncoding if message is not recognized
func RunDirectEncoding(
ctx context.Context,
message Message,
structuredEncoder StructuredEncoder,
binaryEncoder BinaryEncoder,
factories TransformerFactories,
) (Encoding, error) {
if structuredEncoder != nil && !GetOrDefaultFromCtx(ctx, SKIP_DIRECT_STRUCTURED_ENCODING, false).(bool) {
// Wrap the transformers in the structured builder
structuredEncoder = factories.StructuredTransformer(structuredEncoder)
// StructuredTransformer could return nil if one of transcoders doesn't support
// direct structured transcoding
if structuredEncoder != nil {
if err := message.Structured(ctx, structuredEncoder); err == nil {
return EncodingStructured, nil
} else if err != ErrNotStructured {
return EncodingStructured, err
}
}
}
if binaryEncoder != nil && !GetOrDefaultFromCtx(ctx, SKIP_DIRECT_BINARY_ENCODING, false).(bool) {
binaryEncoder = factories.BinaryTransformer(binaryEncoder)
if binaryEncoder != nil {
if err := message.Binary(ctx, binaryEncoder); err == nil {
return EncodingBinary, nil
} else if err != ErrNotBinary {
return EncodingBinary, err
}
}
}
return EncodingUnknown, ErrUnknownEncoding
}
// This is the full algorithm to encode a Message using transformers:
// 1. It first tries direct encoding using RunEncoders
// 2. If no direct encoding is possible, it goes through ToEvent to generate an event representation
// 3. Using the encoders previously defined
// You can tweak the encoding process using the context decorators WithForceStructured, WithForceStructured, etc.
// This function guarantees that transformers are invoked only one time during the encoding process.
// Returns:
// * EncodingStructured, nil if message was structured and correctly translated to Event
// * EncodingBinary, nil if message was binary and correctly translated to Event
// * EncodingStructured, err if message was structured but error happened during translation
// * BinaryEncoding, err if message was binary but error happened during translation
// * EncodingUnknown, ErrUnknownEncoding if message is not recognized
func Encode(
ctx context.Context,
message Message,
structuredEncoder StructuredEncoder,
binaryEncoder BinaryEncoder,
transformers TransformerFactories,
) (Encoding, error) {
enc := message.Encoding()
var err error
// Skip direct encoding if the event is an event message
if enc != EncodingEvent {
enc, err = RunDirectEncoding(ctx, message, structuredEncoder, binaryEncoder, transformers)
if enc != EncodingUnknown {
// Message directly encoded, nothing else to do here
return enc, err
}
}
var e ce.Event
e, enc, err = ToEvent(ctx, message, transformers)
if err != nil {
return enc, err
}
message = EventMessage(e)
if GetOrDefaultFromCtx(ctx, PREFERRED_EVENT_ENCODING, EncodingBinary).(Encoding) == EncodingStructured {
if structuredEncoder != nil {
return EncodingStructured, message.Structured(ctx, structuredEncoder)
}
if binaryEncoder != nil {
return EncodingBinary, message.Binary(ctx, binaryEncoder)
}
} else {
if binaryEncoder != nil {
return EncodingBinary, message.Binary(ctx, binaryEncoder)
}
if structuredEncoder != nil {
return EncodingStructured, message.Structured(ctx, structuredEncoder)
}
}
return enc, errors.New("cannot find a suitable encoder to use from EventMessage")
}
// Skip direct structured to structured encoding during the encoding process
func WithSkipDirectStructuredEncoding(ctx context.Context, skip bool) context.Context {
return context.WithValue(ctx, SKIP_DIRECT_STRUCTURED_ENCODING, skip)
}
// Skip direct binary to binary encoding during the encoding process
func WithSkipDirectBinaryEncoding(ctx context.Context, skip bool) context.Context {
return context.WithValue(ctx, SKIP_DIRECT_BINARY_ENCODING, skip)
}
// Define the preferred encoding from event to message during the encoding process
func WithPreferredEventEncoding(ctx context.Context, enc Encoding) context.Context {
return context.WithValue(ctx, PREFERRED_EVENT_ENCODING, enc)
}
// Force structured encoding during the encoding process
func WithForceStructured(ctx context.Context) context.Context {
return context.WithValue(context.WithValue(ctx, PREFERRED_EVENT_ENCODING, EncodingStructured), SKIP_DIRECT_BINARY_ENCODING, true)
}
// Force binary encoding during the encoding process
func WithForceBinary(ctx context.Context) context.Context {
return context.WithValue(context.WithValue(ctx, PREFERRED_EVENT_ENCODING, EncodingBinary), SKIP_DIRECT_STRUCTURED_ENCODING, true)
}
func GetOrDefaultFromCtx(ctx context.Context, key string, def interface{}) interface{} {
if val := ctx.Value(key); val != nil {
return val
} else {
return def
}
}