-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
130 lines (105 loc) · 3.24 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package events
import (
"bytes"
"compress/zlib"
"encoding/base64"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/sirupsen/logrus"
"github.com/mevansam/goutils/logger"
)
type PublishDataInput struct {
Type string `json:"type"`
Compressed bool `json:"compressed"`
Payload string `json:"payload"`
}
type PublishEventResult struct {
Success bool `json:"success"`
Error string `json:"error"`
}
type CloudEventError struct {
Event *cloudevents.Event
Error string
}
func CreatePublishEventList(eventSource string, cloudEvents []*cloudevents.Event) []PublishDataInput {
var (
err error
dataPayload *PublishDataInput
)
dataPayloads := make([]PublishDataInput, 0, len(cloudEvents))
for _, event := range cloudEvents {
event.SetSource(eventSource)
if dataPayload, err = NewPublishDataInput(event); err != nil {
continue
}
dataPayloads = append(dataPayloads, *dataPayload)
}
return dataPayloads
}
func MapCloudEventsToPublishDataInputs(cloudEvents []*cloudevents.Event) []PublishDataInput {
var (
err error
dataPayload *PublishDataInput
)
dataPayloads := make([]PublishDataInput, 0, len(cloudEvents))
for _, event := range cloudEvents {
if dataPayload, err = NewPublishDataInput(event); err != nil {
continue
}
dataPayloads = append(dataPayloads, *dataPayload)
}
return dataPayloads
}
func CreateCloudEventErrorList(publishEventErrorList []PublishEventResult, cloudEvents []*cloudevents.Event) []CloudEventError {
errors := []CloudEventError{}
for i, result := range publishEventErrorList {
if !bool(result.Success) {
errors = append(errors, CloudEventError{
Event: cloudEvents[i],
Error: string(result.Error),
})
}
}
return errors
}
func FilterCloudEventsWithErrors(publishEventResults []PublishEventResult, cloudEvents []*cloudevents.Event) []*cloudevents.Event {
eventsWithError := []*cloudevents.Event{}
for i, result := range publishEventResults {
cloudEvent := cloudEvents[i]
if !bool(result.Success) {
logger.ErrorMessage(
"FilterCloudEventsWithErrors(): Received error: %s; when publishing cloud event: %s",
result.Error, cloudEvent.String(),
)
eventsWithError = append(eventsWithError, cloudEvents[i])
}
}
return eventsWithError
}
func NewPublishDataInput(event *cloudevents.Event) (*PublishDataInput, error) {
var (
err error
zlibWriter *zlib.Writer
eventPayload []byte
compressedPayload bytes.Buffer
)
if logrus.IsLevelEnabled(logrus.TraceLevel) {
logger.DebugMessage("events.CreatePublishEventList(): Preparing event for posting: %s", event.String())
}
if eventPayload, err = event.MarshalJSON(); err != nil {
logger.ErrorMessage("events.CreatePublishEventList(): Unable to marshal event: %s", err.Error())
return nil, err
}
// compress payload and add it to list of payloads
zlibWriter = zlib.NewWriter(&compressedPayload)
if _, err = zlibWriter.Write([]byte(eventPayload)); err != nil {
logger.ErrorMessage("EventsAPI.PostMeasurementEvents(): Unable to compress marshaled event: %s", event.String())
zlibWriter.Close()
return nil, err
}
zlibWriter.Close()
return &PublishDataInput{
Type: "event",
Compressed: true,
Payload: base64.StdEncoding.EncodeToString(compressedPayload.Bytes()),
}, nil
}