forked from cloudevents/sdk-go
/
to_event.go
128 lines (115 loc) · 3.21 KB
/
to_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package binding
import (
"bytes"
"context"
"errors"
"io"
"github.com/ian-mi/sdk-go/v2/pkg/binding/format"
"github.com/ian-mi/sdk-go/v2/pkg/binding/spec"
"github.com/ian-mi/sdk-go/v2/pkg/event"
"github.com/ian-mi/sdk-go/v2/pkg/types"
)
// Generic error when a conversion of a Message to an Event fails
var ErrCannotConvertToEvent = errors.New("cannot convert message to event")
// Translates a Message with a valid Structured or Binary representation to an Event.
// This function returns the Event generated from the Message and the original encoding of the message or
// an error that points the conversion error.
// transformers can be nil and this function guarantees that they are invoked only once during the encoding process.
func ToEvent(ctx context.Context, message MessageReader, transformers ...TransformerFactory) (*event.Event, error) {
messageEncoding := message.ReadEncoding()
if messageEncoding == EncodingEvent {
for m := message; m != nil; {
if em, ok := m.(*EventMessage); ok {
e := (*event.Event)(em)
if transformers != nil {
var tf TransformerFactories
tf = transformers
if err := tf.EventTransformer()(e); err != nil {
return nil, err
}
}
return e, nil
}
if mw, ok := m.(MessageWrapper); ok {
m = mw.GetWrappedMessage()
} else {
break
}
}
return nil, ErrCannotConvertToEvent
}
e := event.New()
encoder := &messageToEventBuilder{event: &e}
if _, err := DirectWrite(
context.TODO(),
message,
encoder,
encoder,
[]TransformerFactory{},
); err != nil {
return nil, err
}
if transformers != nil {
var tf TransformerFactories
tf = transformers
if err := tf.EventTransformer()(&e); err != nil {
return nil, err
}
}
return &e, nil
}
type messageToEventBuilder struct {
event *event.Event
}
var _ StructuredWriter = (*messageToEventBuilder)(nil)
var _ BinaryWriter = (*messageToEventBuilder)(nil)
func (b *messageToEventBuilder) SetStructuredEvent(ctx context.Context, format format.Format, event io.Reader) error {
var buf bytes.Buffer
_, err := io.Copy(&buf, event)
if err != nil {
return err
}
return format.Unmarshal(buf.Bytes(), b.event)
}
func (b *messageToEventBuilder) Start(ctx context.Context) error {
return nil
}
func (b *messageToEventBuilder) End(ctx context.Context) error {
return nil
}
func (b *messageToEventBuilder) SetData(data io.Reader) error {
var buf bytes.Buffer
w, err := io.Copy(&buf, data)
if err != nil {
return err
}
if w != 0 {
b.event.DataEncoded = buf.Bytes()
}
return nil
}
func (b *messageToEventBuilder) SetAttribute(attribute spec.Attribute, value interface{}) error {
// If spec version we need to change to right context struct
if attribute.Kind() == spec.SpecVersion {
str, err := types.ToString(value)
if err != nil {
return err
}
switch str {
case event.CloudEventsVersionV03:
b.event.Context = b.event.Context.AsV03()
case event.CloudEventsVersionV1:
b.event.Context = b.event.Context.AsV1()
}
return nil
}
return attribute.Set(b.event.Context, value)
}
func (b *messageToEventBuilder) SetExtension(name string, value interface{}) error {
value, err := types.Validate(value)
if err != nil {
return err
}
b.event.SetExtension(name, value)
return nil
}